From 323363040101e9781a79a22f820debfb862170de Mon Sep 17 00:00:00 2001 From: Fanilo-Nantenaina Date: Tue, 13 Jan 2026 17:14:55 +0300 Subject: [PATCH] feat(database): add SQLite optimization and retry mechanisms --- database/db_config.py | 101 +++++++++++++++++++++-- email_queue.py | 181 ++++++++++++++++++++++++++++++++++-------- 2 files changed, 242 insertions(+), 40 deletions(-) diff --git a/database/db_config.py b/database/db_config.py index ab89bbb..bb98f5c 100644 --- a/database/db_config.py +++ b/database/db_config.py @@ -1,6 +1,7 @@ import os from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker from sqlalchemy.pool import NullPool +from sqlalchemy import event, text import logging from database.models.generic_model import Base @@ -9,12 +10,40 @@ logger = logging.getLogger(__name__) DATABASE_URL = os.getenv("DATABASE_URL") -engine = create_async_engine( - DATABASE_URL, - echo=False, - future=True, - poolclass=NullPool, -) + +def _configure_sqlite_connection(dbapi_connection, connection_record): + cursor = dbapi_connection.cursor() + cursor.execute("PRAGMA journal_mode=WAL") + cursor.execute("PRAGMA busy_timeout=30000") + cursor.execute("PRAGMA synchronous=NORMAL") + cursor.execute("PRAGMA cache_size=-64000") # 64MB + cursor.execute("PRAGMA foreign_keys=ON") + cursor.execute("PRAGMA locking_mode=NORMAL") + cursor.close() + + logger.debug("SQLite configuré avec WAL mode et busy_timeout=30s") + + +engine_kwargs = { + "echo": False, + "future": True, + "poolclass": NullPool, +} + +if DATABASE_URL and "sqlite" in DATABASE_URL: + engine_kwargs["connect_args"] = { + "check_same_thread": False, + "timeout": 30, + } + +engine = create_async_engine(DATABASE_URL, **engine_kwargs) + +if DATABASE_URL and "sqlite" in DATABASE_URL: + + @event.listens_for(engine.sync_engine, "connect") + def set_sqlite_pragma(dbapi_connection, connection_record): + _configure_sqlite_connection(dbapi_connection, connection_record) + async_session_factory = async_sessionmaker( engine, @@ -30,6 +59,12 @@ async def init_db(): logger.info("Tentative de connexion") async with engine.begin() as conn: logger.info("Connexion etablie") + + if DATABASE_URL and "sqlite" in DATABASE_URL: + result = await conn.execute(text("PRAGMA journal_mode")) + journal_mode = result.scalar() + logger.info(f"SQLite journal_mode: {journal_mode}") + await conn.run_sync(Base.metadata.create_all) logger.info("create_all execute") @@ -49,3 +84,57 @@ async def get_session() -> AsyncSession: async def close_db(): await engine.dispose() logger.info("Connexions DB fermées") + + +async def execute_with_sqlite_retry( + session: AsyncSession, statement, max_retries: int = 5, base_delay: float = 0.1 +): + import asyncio + from sqlalchemy.exc import OperationalError + + last_error = None + + for attempt in range(max_retries): + try: + result = await session.execute(statement) + return result + except OperationalError as e: + last_error = e + if "database is locked" in str(e).lower(): + delay = base_delay * (2**attempt) + logger.warning( + f"SQLite locked, tentative {attempt + 1}/{max_retries}, " + f"retry dans {delay:.2f}s" + ) + await asyncio.sleep(delay) + else: + raise + + raise last_error + + +async def commit_with_retry( + session: AsyncSession, max_retries: int = 5, base_delay: float = 0.1 +): + import asyncio + from sqlalchemy.exc import OperationalError + + last_error = None + + for attempt in range(max_retries): + try: + await session.commit() + return + except OperationalError as e: + last_error = e + if "database is locked" in str(e).lower(): + delay = base_delay * (2**attempt) + logger.warning( + f"SQLite locked lors du commit, tentative {attempt + 1}/{max_retries}, " + f"retry dans {delay:.2f}s" + ) + await asyncio.sleep(delay) + else: + raise + + raise last_error diff --git a/email_queue.py b/email_queue.py index a3104aa..16b2fcc 100644 --- a/email_queue.py +++ b/email_queue.py @@ -20,6 +20,7 @@ from io import BytesIO from reportlab.lib.units import mm from reportlab.lib.colors import HexColor, Color from PIL import Image +from sqlalchemy.exc import OperationalError logger = logging.getLogger(__name__) @@ -74,6 +75,46 @@ def _register_sage_font(): return False +async def execute_with_retry( + session, + operation, + max_retries: int = 5, + base_delay: float = 0.1, + max_delay: float = 2.0, +): + last_exception = None + + for attempt in range(max_retries): + try: + result = await operation() + return result + except OperationalError as e: + last_exception = e + # Vérifier si c'est un lock SQLite + if "database is locked" in str(e).lower(): + delay = min(base_delay * (2**attempt), max_delay) + # Ajouter un jitter aléatoire pour éviter les collisions + import random + + delay += random.uniform(0, delay * 0.1) + + logger.warning( + f"SQLite locked (tentative {attempt + 1}/{max_retries}), " + f"retry dans {delay:.2f}s" + ) + await asyncio.sleep(delay) + else: + # Autre erreur OperationalError, ne pas retry + raise + except Exception as e: + # Autres exceptions, ne pas retry + raise + + # Toutes les tentatives ont échoué + logger.error(f"Échec après {max_retries} tentatives: {last_exception}") + raise last_exception + + class EmailQueue: def __init__(self): self.queue = queue.Queue() @@ -81,8 +122,10 @@ class EmailQueue: self.running = False self.session_factory = None self.sage_client = None + # Lock pour synchroniser les accès DB dans le worker + self._db_lock = asyncio.Lock() - def start(self, num_workers: int = 3): + def start(self, num_workers: int = 2): # Réduire le nombre de workers pour SQLite if self.running: return @@ -103,8 +146,14 @@ class EmailQueue: except Exception: pass - def enqueue(self, email_log_id: str): - self.queue.put(email_log_id) + def enqueue(self, email_log_id: str, delay_seconds: float = 0): + if delay_seconds > 0: + timer = threading.Timer(delay_seconds, lambda: self.queue.put(email_log_id)) + timer.daemon = True + timer.start() + logger.debug(f"Email {email_log_id} planifié dans {delay_seconds}s") + else: + self.queue.put(email_log_id) def _worker(self): loop = asyncio.new_event_loop() @@ -119,7 +168,7 @@ class EmailQueue: except queue.Empty: continue except Exception as e: - logger.error(f"Erreur worker: {e}") + logger.error(f"Erreur worker: {e}", exc_info=True) try: self.queue.task_done() except Exception: @@ -130,47 +179,109 @@ class EmailQueue: async def _process_email(self, email_log_id: str): from database import EmailLog, StatutEmail from sqlalchemy import select + from sqlalchemy.exc import OperationalError if not self.session_factory: logger.error("session_factory non configuré") return - async with self.session_factory() as session: - result = await session.execute( - select(EmailLog).where(EmailLog.id == email_log_id) - ) - email_log = result.scalar_one_or_none() - - if not email_log: - logger.error(f"Email log {email_log_id} introuvable") - return - - email_log.statut = StatutEmail.EN_COURS - email_log.nb_tentatives += 1 - await session.commit() + max_db_retries = 5 + for db_attempt in range(max_db_retries): try: - await self._send_with_retry(email_log) - email_log.statut = StatutEmail.ENVOYE - email_log.date_envoi = datetime.now() - email_log.derniere_erreur = None + async with self.session_factory() as session: + # Lecture de l'email log avec retry + async def fetch_email(): + result = await session.execute( + select(EmailLog).where(EmailLog.id == email_log_id) + ) + return result.scalar_one_or_none() - except Exception as e: - error_msg = str(e) - email_log.statut = StatutEmail.ERREUR - email_log.derniere_erreur = error_msg[:1000] + email_log = await execute_with_retry(session, fetch_email) - if email_log.nb_tentatives < settings.max_retry_attempts: - delay = settings.retry_delay_seconds * ( - 2 ** (email_log.nb_tentatives - 1) + if not email_log: + logger.error(f"Email log {email_log_id} introuvable") + return + + # Mise à jour du statut avec retry + async def update_status_en_cours(): + email_log.statut = StatutEmail.EN_COURS + email_log.nb_tentatives += 1 + await session.commit() + + await execute_with_retry(session, update_status_en_cours) + + try: + # Envoi de l'email (pas de DB ici) + await self._send_with_retry(email_log) + + # Mise à jour succès avec retry + async def update_status_success(): + email_log.statut = StatutEmail.ENVOYE + email_log.date_envoi = datetime.now() + email_log.derniere_erreur = None + await session.commit() + + await execute_with_retry(session, update_status_success) + + logger.info(f"✅ Email envoyé: {email_log.destinataire}") + + except Exception as e: + error_msg = str(e) + logger.error(f"Erreur envoi email: {error_msg}") + + # Mise à jour erreur avec retry + async def update_status_error(): + email_log.statut = StatutEmail.ERREUR + email_log.derniere_erreur = error_msg[:1000] + + if email_log.nb_tentatives < settings.max_retry_attempts: + delay = settings.retry_delay_seconds * ( + 2 ** (email_log.nb_tentatives - 1) + ) + email_log.prochain_retry = datetime.now() + timedelta( + seconds=delay + ) + + await session.commit() + + await execute_with_retry(session, update_status_error) + + # Replanifier si tentatives restantes + if email_log.nb_tentatives < settings.max_retry_attempts: + delay = settings.retry_delay_seconds * ( + 2 ** (email_log.nb_tentatives - 1) + ) + self.enqueue(email_log_id, delay_seconds=delay) + logger.info( + f"Email {email_log_id} replanifié dans {delay}s" + ) + + # Sortir de la boucle de retry si tout s'est bien passé + return + + except OperationalError as e: + if "database is locked" in str(e).lower(): + delay = 0.5 * (2**db_attempt) + logger.warning( + f"DB locked lors du traitement email {email_log_id}, " + f"tentative {db_attempt + 1}/{max_db_retries}, " + f"retry dans {delay:.1f}s" ) - email_log.prochain_retry = datetime.now() + timedelta(seconds=delay) + await asyncio.sleep(delay) + else: + logger.error(f"Erreur DB non récupérable: {e}") + raise + except Exception as e: + logger.error(f"Erreur inattendue traitement email: {e}", exc_info=True) + raise - timer = threading.Timer(delay, self.enqueue, args=[email_log_id]) - timer.daemon = True - timer.start() - - await session.commit() + # Si on arrive ici, toutes les tentatives ont échoué + logger.error( + f"Échec définitif traitement email {email_log_id} après {max_db_retries} tentatives DB" + ) + # Replanifier l'email pour plus tard + self.enqueue(email_log_id, delay_seconds=30) async def _send_with_retry(self, email_log): msg = MIMEMultipart() @@ -239,6 +350,8 @@ class EmailQueue: # Fermeture server.quit() + logger.info(f"SMTP: Email envoyé à {msg['To']}") + except Exception as e: if server: try: