feat(database): add SQLite optimization and retry mechanisms
This commit is contained in:
parent
358b2e3639
commit
3233630401
2 changed files with 242 additions and 40 deletions
|
|
@ -1,6 +1,7 @@
|
||||||
import os
|
import os
|
||||||
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
|
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
|
||||||
from sqlalchemy.pool import NullPool
|
from sqlalchemy.pool import NullPool
|
||||||
|
from sqlalchemy import event, text
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
from database.models.generic_model import Base
|
from database.models.generic_model import Base
|
||||||
|
|
@ -9,12 +10,40 @@ logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
DATABASE_URL = os.getenv("DATABASE_URL")
|
DATABASE_URL = os.getenv("DATABASE_URL")
|
||||||
|
|
||||||
engine = create_async_engine(
|
|
||||||
DATABASE_URL,
|
def _configure_sqlite_connection(dbapi_connection, connection_record):
|
||||||
echo=False,
|
cursor = dbapi_connection.cursor()
|
||||||
future=True,
|
cursor.execute("PRAGMA journal_mode=WAL")
|
||||||
poolclass=NullPool,
|
cursor.execute("PRAGMA busy_timeout=30000")
|
||||||
)
|
cursor.execute("PRAGMA synchronous=NORMAL")
|
||||||
|
cursor.execute("PRAGMA cache_size=-64000") # 64MB
|
||||||
|
cursor.execute("PRAGMA foreign_keys=ON")
|
||||||
|
cursor.execute("PRAGMA locking_mode=NORMAL")
|
||||||
|
cursor.close()
|
||||||
|
|
||||||
|
logger.debug("SQLite configuré avec WAL mode et busy_timeout=30s")
|
||||||
|
|
||||||
|
|
||||||
|
engine_kwargs = {
|
||||||
|
"echo": False,
|
||||||
|
"future": True,
|
||||||
|
"poolclass": NullPool,
|
||||||
|
}
|
||||||
|
|
||||||
|
if DATABASE_URL and "sqlite" in DATABASE_URL:
|
||||||
|
engine_kwargs["connect_args"] = {
|
||||||
|
"check_same_thread": False,
|
||||||
|
"timeout": 30,
|
||||||
|
}
|
||||||
|
|
||||||
|
engine = create_async_engine(DATABASE_URL, **engine_kwargs)
|
||||||
|
|
||||||
|
if DATABASE_URL and "sqlite" in DATABASE_URL:
|
||||||
|
|
||||||
|
@event.listens_for(engine.sync_engine, "connect")
|
||||||
|
def set_sqlite_pragma(dbapi_connection, connection_record):
|
||||||
|
_configure_sqlite_connection(dbapi_connection, connection_record)
|
||||||
|
|
||||||
|
|
||||||
async_session_factory = async_sessionmaker(
|
async_session_factory = async_sessionmaker(
|
||||||
engine,
|
engine,
|
||||||
|
|
@ -30,6 +59,12 @@ async def init_db():
|
||||||
logger.info("Tentative de connexion")
|
logger.info("Tentative de connexion")
|
||||||
async with engine.begin() as conn:
|
async with engine.begin() as conn:
|
||||||
logger.info("Connexion etablie")
|
logger.info("Connexion etablie")
|
||||||
|
|
||||||
|
if DATABASE_URL and "sqlite" in DATABASE_URL:
|
||||||
|
result = await conn.execute(text("PRAGMA journal_mode"))
|
||||||
|
journal_mode = result.scalar()
|
||||||
|
logger.info(f"SQLite journal_mode: {journal_mode}")
|
||||||
|
|
||||||
await conn.run_sync(Base.metadata.create_all)
|
await conn.run_sync(Base.metadata.create_all)
|
||||||
logger.info("create_all execute")
|
logger.info("create_all execute")
|
||||||
|
|
||||||
|
|
@ -49,3 +84,57 @@ async def get_session() -> AsyncSession:
|
||||||
async def close_db():
|
async def close_db():
|
||||||
await engine.dispose()
|
await engine.dispose()
|
||||||
logger.info("Connexions DB fermées")
|
logger.info("Connexions DB fermées")
|
||||||
|
|
||||||
|
|
||||||
|
async def execute_with_sqlite_retry(
|
||||||
|
session: AsyncSession, statement, max_retries: int = 5, base_delay: float = 0.1
|
||||||
|
):
|
||||||
|
import asyncio
|
||||||
|
from sqlalchemy.exc import OperationalError
|
||||||
|
|
||||||
|
last_error = None
|
||||||
|
|
||||||
|
for attempt in range(max_retries):
|
||||||
|
try:
|
||||||
|
result = await session.execute(statement)
|
||||||
|
return result
|
||||||
|
except OperationalError as e:
|
||||||
|
last_error = e
|
||||||
|
if "database is locked" in str(e).lower():
|
||||||
|
delay = base_delay * (2**attempt)
|
||||||
|
logger.warning(
|
||||||
|
f"SQLite locked, tentative {attempt + 1}/{max_retries}, "
|
||||||
|
f"retry dans {delay:.2f}s"
|
||||||
|
)
|
||||||
|
await asyncio.sleep(delay)
|
||||||
|
else:
|
||||||
|
raise
|
||||||
|
|
||||||
|
raise last_error
|
||||||
|
|
||||||
|
|
||||||
|
async def commit_with_retry(
|
||||||
|
session: AsyncSession, max_retries: int = 5, base_delay: float = 0.1
|
||||||
|
):
|
||||||
|
import asyncio
|
||||||
|
from sqlalchemy.exc import OperationalError
|
||||||
|
|
||||||
|
last_error = None
|
||||||
|
|
||||||
|
for attempt in range(max_retries):
|
||||||
|
try:
|
||||||
|
await session.commit()
|
||||||
|
return
|
||||||
|
except OperationalError as e:
|
||||||
|
last_error = e
|
||||||
|
if "database is locked" in str(e).lower():
|
||||||
|
delay = base_delay * (2**attempt)
|
||||||
|
logger.warning(
|
||||||
|
f"SQLite locked lors du commit, tentative {attempt + 1}/{max_retries}, "
|
||||||
|
f"retry dans {delay:.2f}s"
|
||||||
|
)
|
||||||
|
await asyncio.sleep(delay)
|
||||||
|
else:
|
||||||
|
raise
|
||||||
|
|
||||||
|
raise last_error
|
||||||
|
|
|
||||||
181
email_queue.py
181
email_queue.py
|
|
@ -20,6 +20,7 @@ from io import BytesIO
|
||||||
from reportlab.lib.units import mm
|
from reportlab.lib.units import mm
|
||||||
from reportlab.lib.colors import HexColor, Color
|
from reportlab.lib.colors import HexColor, Color
|
||||||
from PIL import Image
|
from PIL import Image
|
||||||
|
from sqlalchemy.exc import OperationalError
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
@ -74,6 +75,46 @@ def _register_sage_font():
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
async def execute_with_retry(
|
||||||
|
session,
|
||||||
|
operation,
|
||||||
|
max_retries: int = 5,
|
||||||
|
base_delay: float = 0.1,
|
||||||
|
max_delay: float = 2.0,
|
||||||
|
):
|
||||||
|
last_exception = None
|
||||||
|
|
||||||
|
for attempt in range(max_retries):
|
||||||
|
try:
|
||||||
|
result = await operation()
|
||||||
|
return result
|
||||||
|
except OperationalError as e:
|
||||||
|
last_exception = e
|
||||||
|
# Vérifier si c'est un lock SQLite
|
||||||
|
if "database is locked" in str(e).lower():
|
||||||
|
delay = min(base_delay * (2**attempt), max_delay)
|
||||||
|
# Ajouter un jitter aléatoire pour éviter les collisions
|
||||||
|
import random
|
||||||
|
|
||||||
|
delay += random.uniform(0, delay * 0.1)
|
||||||
|
|
||||||
|
logger.warning(
|
||||||
|
f"SQLite locked (tentative {attempt + 1}/{max_retries}), "
|
||||||
|
f"retry dans {delay:.2f}s"
|
||||||
|
)
|
||||||
|
await asyncio.sleep(delay)
|
||||||
|
else:
|
||||||
|
# Autre erreur OperationalError, ne pas retry
|
||||||
|
raise
|
||||||
|
except Exception as e:
|
||||||
|
# Autres exceptions, ne pas retry
|
||||||
|
raise
|
||||||
|
|
||||||
|
# Toutes les tentatives ont échoué
|
||||||
|
logger.error(f"Échec après {max_retries} tentatives: {last_exception}")
|
||||||
|
raise last_exception
|
||||||
|
|
||||||
|
|
||||||
class EmailQueue:
|
class EmailQueue:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.queue = queue.Queue()
|
self.queue = queue.Queue()
|
||||||
|
|
@ -81,8 +122,10 @@ class EmailQueue:
|
||||||
self.running = False
|
self.running = False
|
||||||
self.session_factory = None
|
self.session_factory = None
|
||||||
self.sage_client = None
|
self.sage_client = None
|
||||||
|
# Lock pour synchroniser les accès DB dans le worker
|
||||||
|
self._db_lock = asyncio.Lock()
|
||||||
|
|
||||||
def start(self, num_workers: int = 3):
|
def start(self, num_workers: int = 2): # Réduire le nombre de workers pour SQLite
|
||||||
if self.running:
|
if self.running:
|
||||||
return
|
return
|
||||||
|
|
||||||
|
|
@ -103,8 +146,14 @@ class EmailQueue:
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def enqueue(self, email_log_id: str):
|
def enqueue(self, email_log_id: str, delay_seconds: float = 0):
|
||||||
self.queue.put(email_log_id)
|
if delay_seconds > 0:
|
||||||
|
timer = threading.Timer(delay_seconds, lambda: self.queue.put(email_log_id))
|
||||||
|
timer.daemon = True
|
||||||
|
timer.start()
|
||||||
|
logger.debug(f"Email {email_log_id} planifié dans {delay_seconds}s")
|
||||||
|
else:
|
||||||
|
self.queue.put(email_log_id)
|
||||||
|
|
||||||
def _worker(self):
|
def _worker(self):
|
||||||
loop = asyncio.new_event_loop()
|
loop = asyncio.new_event_loop()
|
||||||
|
|
@ -119,7 +168,7 @@ class EmailQueue:
|
||||||
except queue.Empty:
|
except queue.Empty:
|
||||||
continue
|
continue
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Erreur worker: {e}")
|
logger.error(f"Erreur worker: {e}", exc_info=True)
|
||||||
try:
|
try:
|
||||||
self.queue.task_done()
|
self.queue.task_done()
|
||||||
except Exception:
|
except Exception:
|
||||||
|
|
@ -130,47 +179,109 @@ class EmailQueue:
|
||||||
async def _process_email(self, email_log_id: str):
|
async def _process_email(self, email_log_id: str):
|
||||||
from database import EmailLog, StatutEmail
|
from database import EmailLog, StatutEmail
|
||||||
from sqlalchemy import select
|
from sqlalchemy import select
|
||||||
|
from sqlalchemy.exc import OperationalError
|
||||||
|
|
||||||
if not self.session_factory:
|
if not self.session_factory:
|
||||||
logger.error("session_factory non configuré")
|
logger.error("session_factory non configuré")
|
||||||
return
|
return
|
||||||
|
|
||||||
async with self.session_factory() as session:
|
max_db_retries = 5
|
||||||
result = await session.execute(
|
|
||||||
select(EmailLog).where(EmailLog.id == email_log_id)
|
|
||||||
)
|
|
||||||
email_log = result.scalar_one_or_none()
|
|
||||||
|
|
||||||
if not email_log:
|
|
||||||
logger.error(f"Email log {email_log_id} introuvable")
|
|
||||||
return
|
|
||||||
|
|
||||||
email_log.statut = StatutEmail.EN_COURS
|
|
||||||
email_log.nb_tentatives += 1
|
|
||||||
await session.commit()
|
|
||||||
|
|
||||||
|
for db_attempt in range(max_db_retries):
|
||||||
try:
|
try:
|
||||||
await self._send_with_retry(email_log)
|
async with self.session_factory() as session:
|
||||||
email_log.statut = StatutEmail.ENVOYE
|
# Lecture de l'email log avec retry
|
||||||
email_log.date_envoi = datetime.now()
|
async def fetch_email():
|
||||||
email_log.derniere_erreur = None
|
result = await session.execute(
|
||||||
|
select(EmailLog).where(EmailLog.id == email_log_id)
|
||||||
|
)
|
||||||
|
return result.scalar_one_or_none()
|
||||||
|
|
||||||
except Exception as e:
|
email_log = await execute_with_retry(session, fetch_email)
|
||||||
error_msg = str(e)
|
|
||||||
email_log.statut = StatutEmail.ERREUR
|
|
||||||
email_log.derniere_erreur = error_msg[:1000]
|
|
||||||
|
|
||||||
if email_log.nb_tentatives < settings.max_retry_attempts:
|
if not email_log:
|
||||||
delay = settings.retry_delay_seconds * (
|
logger.error(f"Email log {email_log_id} introuvable")
|
||||||
2 ** (email_log.nb_tentatives - 1)
|
return
|
||||||
|
|
||||||
|
# Mise à jour du statut avec retry
|
||||||
|
async def update_status_en_cours():
|
||||||
|
email_log.statut = StatutEmail.EN_COURS
|
||||||
|
email_log.nb_tentatives += 1
|
||||||
|
await session.commit()
|
||||||
|
|
||||||
|
await execute_with_retry(session, update_status_en_cours)
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Envoi de l'email (pas de DB ici)
|
||||||
|
await self._send_with_retry(email_log)
|
||||||
|
|
||||||
|
# Mise à jour succès avec retry
|
||||||
|
async def update_status_success():
|
||||||
|
email_log.statut = StatutEmail.ENVOYE
|
||||||
|
email_log.date_envoi = datetime.now()
|
||||||
|
email_log.derniere_erreur = None
|
||||||
|
await session.commit()
|
||||||
|
|
||||||
|
await execute_with_retry(session, update_status_success)
|
||||||
|
|
||||||
|
logger.info(f"✅ Email envoyé: {email_log.destinataire}")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
error_msg = str(e)
|
||||||
|
logger.error(f"Erreur envoi email: {error_msg}")
|
||||||
|
|
||||||
|
# Mise à jour erreur avec retry
|
||||||
|
async def update_status_error():
|
||||||
|
email_log.statut = StatutEmail.ERREUR
|
||||||
|
email_log.derniere_erreur = error_msg[:1000]
|
||||||
|
|
||||||
|
if email_log.nb_tentatives < settings.max_retry_attempts:
|
||||||
|
delay = settings.retry_delay_seconds * (
|
||||||
|
2 ** (email_log.nb_tentatives - 1)
|
||||||
|
)
|
||||||
|
email_log.prochain_retry = datetime.now() + timedelta(
|
||||||
|
seconds=delay
|
||||||
|
)
|
||||||
|
|
||||||
|
await session.commit()
|
||||||
|
|
||||||
|
await execute_with_retry(session, update_status_error)
|
||||||
|
|
||||||
|
# Replanifier si tentatives restantes
|
||||||
|
if email_log.nb_tentatives < settings.max_retry_attempts:
|
||||||
|
delay = settings.retry_delay_seconds * (
|
||||||
|
2 ** (email_log.nb_tentatives - 1)
|
||||||
|
)
|
||||||
|
self.enqueue(email_log_id, delay_seconds=delay)
|
||||||
|
logger.info(
|
||||||
|
f"Email {email_log_id} replanifié dans {delay}s"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Sortir de la boucle de retry si tout s'est bien passé
|
||||||
|
return
|
||||||
|
|
||||||
|
except OperationalError as e:
|
||||||
|
if "database is locked" in str(e).lower():
|
||||||
|
delay = 0.5 * (2**db_attempt)
|
||||||
|
logger.warning(
|
||||||
|
f"DB locked lors du traitement email {email_log_id}, "
|
||||||
|
f"tentative {db_attempt + 1}/{max_db_retries}, "
|
||||||
|
f"retry dans {delay:.1f}s"
|
||||||
)
|
)
|
||||||
email_log.prochain_retry = datetime.now() + timedelta(seconds=delay)
|
await asyncio.sleep(delay)
|
||||||
|
else:
|
||||||
|
logger.error(f"Erreur DB non récupérable: {e}")
|
||||||
|
raise
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Erreur inattendue traitement email: {e}", exc_info=True)
|
||||||
|
raise
|
||||||
|
|
||||||
timer = threading.Timer(delay, self.enqueue, args=[email_log_id])
|
# Si on arrive ici, toutes les tentatives ont échoué
|
||||||
timer.daemon = True
|
logger.error(
|
||||||
timer.start()
|
f"Échec définitif traitement email {email_log_id} après {max_db_retries} tentatives DB"
|
||||||
|
)
|
||||||
await session.commit()
|
# Replanifier l'email pour plus tard
|
||||||
|
self.enqueue(email_log_id, delay_seconds=30)
|
||||||
|
|
||||||
async def _send_with_retry(self, email_log):
|
async def _send_with_retry(self, email_log):
|
||||||
msg = MIMEMultipart()
|
msg = MIMEMultipart()
|
||||||
|
|
@ -239,6 +350,8 @@ class EmailQueue:
|
||||||
# Fermeture
|
# Fermeture
|
||||||
server.quit()
|
server.quit()
|
||||||
|
|
||||||
|
logger.info(f"SMTP: Email envoyé à {msg['To']}")
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
if server:
|
if server:
|
||||||
try:
|
try:
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue