feat(enterprise): add enterprise search functionality with API integration

This commit is contained in:
Fanilo-Nantenaina 2026-01-13 18:20:30 +03:00
parent 30ffc7a493
commit e7003d4059
8 changed files with 338 additions and 86 deletions

4
api.py
View file

@ -17,7 +17,6 @@ from sqlalchemy import select
import os
from pathlib import Path as FilePath
from data.data import TAGS_METADATA, templates_signature_email
from routes.auth import router as auth_router
from config.config import settings
from database import (
init_db,
@ -78,8 +77,10 @@ from schemas.tiers.commercial import (
CollaborateurUpdate,
)
from utils.normalization import normaliser_type_tiers
from routes.auth import router as auth_router
from routes.sage_gateway import router as sage_gateway_router
from routes.universign import router as universign_router
from routes.enterprise import router as entreprises_router
from services.universign_sync import UniversignSyncService, UniversignSyncScheduler
@ -172,6 +173,7 @@ app.add_middleware(
app.include_router(auth_router)
app.include_router(sage_gateway_router)
app.include_router(universign_router)
app.include_router(entreprises_router)
@app.get("/clients", response_model=List[ClientDetails], tags=["Clients"])

View file

@ -21,6 +21,8 @@ from reportlab.lib.units import mm
from reportlab.lib.colors import HexColor, Color
from PIL import Image
from sqlalchemy.exc import OperationalError
logger = logging.getLogger(__name__)
@ -74,11 +76,6 @@ def _register_sage_font():
return False
# ============================================================================
# HELPERS POUR GESTION DES LOCKS SQLite
# ============================================================================
async def execute_with_retry(
session,
operation,
@ -86,19 +83,6 @@ async def execute_with_retry(
base_delay: float = 0.1,
max_delay: float = 2.0,
):
"""
Exécute une opération async avec retry en cas de lock SQLite.
Args:
session: Session SQLAlchemy async
operation: Coroutine à exécuter
max_retries: Nombre max de tentatives
base_delay: Délai initial entre les tentatives (secondes)
max_delay: Délai maximum entre les tentatives
"""
import sqlite3
from sqlalchemy.exc import OperationalError
last_exception = None
for attempt in range(max_retries):
@ -107,10 +91,8 @@ async def execute_with_retry(
return result
except OperationalError as e:
last_exception = e
# Vérifier si c'est un lock SQLite
if "database is locked" in str(e).lower():
delay = min(base_delay * (2**attempt), max_delay)
# Ajouter un jitter aléatoire pour éviter les collisions
import random
delay += random.uniform(0, delay * 0.1)
@ -121,13 +103,10 @@ async def execute_with_retry(
)
await asyncio.sleep(delay)
else:
# Autre erreur OperationalError, ne pas retry
raise
except Exception as e:
# Autres exceptions, ne pas retry
except Exception:
raise
# Toutes les tentatives ont échoué
logger.error(f"Échec après {max_retries} tentatives: {last_exception}")
raise last_exception
@ -139,10 +118,9 @@ class EmailQueue:
self.running = False
self.session_factory = None
self.sage_client = None
# Lock pour synchroniser les accès DB dans le worker
self._db_lock = asyncio.Lock()
def start(self, num_workers: int = 2): # Réduire le nombre de workers pour SQLite
def start(self, num_workers: int = 2):
if self.running:
return
@ -164,13 +142,6 @@ class EmailQueue:
pass
def enqueue(self, email_log_id: str, delay_seconds: float = 0):
"""
Ajoute un email à la queue avec un délai optionnel.
Args:
email_log_id: ID de l'email log
delay_seconds: Délai avant traitement (pour éviter les conflits)
"""
if delay_seconds > 0:
timer = threading.Timer(delay_seconds, lambda: self.queue.put(email_log_id))
timer.daemon = True
@ -214,7 +185,7 @@ class EmailQueue:
for db_attempt in range(max_db_retries):
try:
async with self.session_factory() as session:
# Lecture de l'email log avec retry
async def fetch_email():
result = await session.execute(
select(EmailLog).where(EmailLog.id == email_log_id)
@ -227,7 +198,6 @@ class EmailQueue:
logger.error(f"Email log {email_log_id} introuvable")
return
# Mise à jour du statut avec retry
async def update_status_en_cours():
email_log.statut = StatutEmail.EN_COURS
email_log.nb_tentatives += 1
@ -236,10 +206,8 @@ class EmailQueue:
await execute_with_retry(session, update_status_en_cours)
try:
# Envoi de l'email (pas de DB ici)
await self._send_with_retry(email_log)
# Mise à jour succès avec retry
async def update_status_success():
email_log.statut = StatutEmail.ENVOYE
email_log.date_envoi = datetime.now()
@ -248,13 +216,12 @@ class EmailQueue:
await execute_with_retry(session, update_status_success)
logger.info(f" Email envoyé: {email_log.destinataire}")
logger.info(f" Email envoyé: {email_log.destinataire}")
except Exception as e:
error_msg = str(e)
logger.error(f"Erreur envoi email: {error_msg}")
# Mise à jour erreur avec retry
async def update_status_error():
email_log.statut = StatutEmail.ERREUR
email_log.derniere_erreur = error_msg[:1000]
@ -271,7 +238,6 @@ class EmailQueue:
await execute_with_retry(session, update_status_error)
# Replanifier si tentatives restantes
if email_log.nb_tentatives < settings.max_retry_attempts:
delay = settings.retry_delay_seconds * (
2 ** (email_log.nb_tentatives - 1)
@ -281,7 +247,6 @@ class EmailQueue:
f"Email {email_log_id} replanifié dans {delay}s"
)
# Sortir de la boucle de retry si tout s'est bien passé
return
except OperationalError as e:
@ -300,11 +265,9 @@ class EmailQueue:
logger.error(f"Erreur inattendue traitement email: {e}", exc_info=True)
raise
# Si on arrive ici, toutes les tentatives ont échoué
logger.error(
f"Échec définitif traitement email {email_log_id} après {max_db_retries} tentatives DB"
)
# Replanifier l'email pour plus tard
self.enqueue(email_log_id, delay_seconds=30)
async def _send_with_retry(self, email_log):
@ -314,7 +277,6 @@ class EmailQueue:
msg["Subject"] = email_log.sujet
msg.attach(MIMEText(email_log.corps_html, "html"))
# Attachement des PDFs
if email_log.document_ids:
document_ids = email_log.document_ids.split(",")
type_doc = email_log.type_document
@ -339,39 +301,31 @@ class EmailQueue:
except Exception as e:
logger.error(f"Erreur génération PDF {doc_id}: {e}")
# Envoi SMTP
await asyncio.to_thread(self._send_smtp, msg)
def _send_smtp(self, msg):
server = None
try:
# Résolution DNS
socket.getaddrinfo(settings.smtp_host, settings.smtp_port)
# Connexion
server = smtplib.SMTP(settings.smtp_host, settings.smtp_port, timeout=30)
# EHLO
server.ehlo()
# STARTTLS
if settings.smtp_use_tls:
if server.has_extn("STARTTLS"):
context = ssl.create_default_context()
server.starttls(context=context)
server.ehlo()
# Authentification
if settings.smtp_user and settings.smtp_password:
server.login(settings.smtp_user, settings.smtp_password)
# Envoi
refused = server.send_message(msg)
if refused:
raise Exception(f"Destinataires refusés: {refused}")
# Fermeture
server.quit()
logger.info(f"SMTP: Email envoyé à {msg['To']}")
@ -396,11 +350,9 @@ class EmailQueue:
if not doc:
raise Exception(f"Document {doc_id} introuvable")
# Récupérer les informations de la société émettrice
societe_info = None
try:
societe_info = self.sage_client.lire_informations_societe()
# Log selon le type (dict ou objet)
if societe_info:
if isinstance(societe_info, dict):
raison = societe_info.get("raison_sociale", "N/A")
@ -410,13 +362,11 @@ class EmailQueue:
except Exception as e:
logger.warning(f"Impossible de récupérer les infos société: {e}")
# Générer le PDF avec le nouveau générateur
generator = SagePDFGenerator(doc, type_doc, societe_info=societe_info)
return generator.generate()
class SagePDFGenerator:
# Couleurs Sage
SAGE_GREEN = HexColor("#00D639")
SAGE_GREEN_DARK = HexColor("#00B830")
GRAY_50 = HexColor("#F9FAFB")
@ -430,13 +380,11 @@ class SagePDFGenerator:
GRAY_800 = HexColor("#1F2937")
GRAY_900 = HexColor("#111827")
# Valeurs par défaut (fallback si société non disponible)
DEFAULT_COMPANY_NAME = "Entreprise"
DEFAULT_COMPANY_ADDRESS = ""
DEFAULT_COMPANY_CITY = ""
DEFAULT_COMPANY_EMAIL = ""
# Labels des types de documents
TYPE_LABELS = {
0: "Devis",
10: "Commande",
@ -451,19 +399,16 @@ class SagePDFGenerator:
self.type_label = self.TYPE_LABELS.get(type_doc, "Document")
self.societe_info = societe_info
# Configuration de la page
self.page_width, self.page_height = A4
self.margin = 15 * mm
self.content_width = self.page_width - 2 * self.margin
# État du générateur
self.buffer = BytesIO()
self.pdf = None
self.current_y = 0
self.page_number = 1
self.total_pages = 1 # Sera calculé
self.total_pages = 1
# Initialiser les polices
_register_sage_font()
self.use_sage_font = _sage_font_registered
@ -472,7 +417,6 @@ class SagePDFGenerator:
if self.societe_info is None:
return default
# Support dict ou objet Pydantic
if isinstance(self.societe_info, dict):
value = self.societe_info.get(field)
else:
@ -514,7 +458,6 @@ class SagePDFGenerator:
def _get_societe_email(self) -> str:
"""Retourne l'email de la société."""
# Priorité à email_societe, puis email
email = self._get_societe_field("email_societe", "")
if not email:
email = self._get_societe_field("email", "")
@ -668,7 +611,6 @@ class SagePDFGenerator:
col1_x, y, "ÉMETTEUR", font_size=8, bold=True, color=self.GRAY_400
)
# === INFORMATIONS SOCIÉTÉ (dynamiques) ===
y_emetteur = y - 6 * mm
self._draw_text(
col1_x,
@ -679,18 +621,15 @@ class SagePDFGenerator:
color=self.GRAY_800,
)
# Adresse ligne 1
address_line1 = self._get_societe_address_line1()
if address_line1:
y_emetteur -= 5 * mm
# Tronquer si trop long
if len(address_line1) > 45:
address_line1 = address_line1[:42] + "..."
self._draw_text(
col1_x, y_emetteur, address_line1, font_size=9, color=self.GRAY_600
)
# Adresse ligne 2 (CP + Ville)
address_line2 = self._get_societe_address_line2()
if address_line2:
y_emetteur -= 4 * mm
@ -698,7 +637,6 @@ class SagePDFGenerator:
col1_x, y_emetteur, address_line2, font_size=9, color=self.GRAY_600
)
# Email
societe_email = self._get_societe_email()
if societe_email:
y_emetteur -= 5 * mm
@ -706,7 +644,6 @@ class SagePDFGenerator:
col1_x, y_emetteur, societe_email, font_size=9, color=self.GRAY_600
)
# Téléphone (optionnel)
societe_phone = self._get_societe_phone()
if societe_phone:
y_emetteur -= 4 * mm
@ -718,12 +655,10 @@ class SagePDFGenerator:
color=self.GRAY_500,
)
# === DESTINATAIRE ===
box_padding = 4 * mm
box_height = 26 * mm
box_y = y - 3 * mm
# Fond gris arrondi
self.pdf.setFillColor(self.GRAY_50)
self.pdf.roundRect(
col2_x - box_padding,
@ -907,7 +842,6 @@ class SagePDFGenerator:
align="right",
)
# TVA
taux_tva = ligne.get("taux_taxe1") or ligne.get("taux_tva") or 20
self._draw_text(
col_tva,
@ -967,7 +901,6 @@ class SagePDFGenerator:
)
y -= 6 * mm
# TVA
total_ttc = self.doc.get("total_ttc") or 0
tva = total_ttc - total_ht + remise_globale
taux_tva_global = self.doc.get("taux_tva_principal") or 20
@ -1045,7 +978,6 @@ class SagePDFGenerator:
def _draw_footer(self):
footer_y = 15 * mm
# Informations légales de la société
legal_parts = []
societe_name = self._get_societe_name()
@ -1060,11 +992,9 @@ class SagePDFGenerator:
if tva:
legal_parts.append(f"TVA: {tva}")
# Forme juridique et capital si disponibles
if self.societe_info:
forme = self._get_societe_field("forme_juridique", "")
capital = self._get_societe_field("capital", 0)
# Convertir en float si c'est une string
if isinstance(capital, str):
try:
capital = float(capital)
@ -1075,10 +1005,8 @@ class SagePDFGenerator:
f"{forme} au capital de {float(capital):,.0f}".replace(",", " ")
)
# Dessiner les informations légales
if legal_parts:
legal_text = "".join(legal_parts)
# Tronquer si trop long
if len(legal_text) > 100:
legal_text = legal_text[:97] + "..."
self._draw_text(
@ -1090,7 +1018,6 @@ class SagePDFGenerator:
align="center",
)
# Pagination
page_text = f"Page {self.page_number} / {self.total_pages}"
self._draw_text(
self.page_width / 2,

159
routes/enterprise.py Normal file
View file

@ -0,0 +1,159 @@
from fastapi import APIRouter, HTTPException, Query
import httpx
import logging
from datetime import datetime
from schemas import EntrepriseSearch, EntrepriseSearchResponse
from utils.enterprise import (
calculer_tva_intracommunautaire,
mapper_resultat_api,
rechercher_entreprise_api,
)
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/entreprises", tags=["Entreprises"])
@router.get("/search", response_model=EntrepriseSearchResponse)
async def rechercher_entreprise(
q: str = Query(..., min_length=2, description="Nom d'entreprise, SIREN ou SIRET"),
per_page: int = Query(5, ge=1, le=25, description="Nombre de résultats (max 25)"),
):
try:
logger.info(f" Recherche entreprise: '{q}'")
# Appel API
api_response = await rechercher_entreprise_api(q, per_page)
resultats_api = api_response.get("results", [])
if not resultats_api:
logger.info(f"Aucun résultat pour: {q}")
return EntrepriseSearchResponse(total_results=0, results=[], query=q)
entreprises = []
for data in resultats_api:
entreprise = mapper_resultat_api(data)
if entreprise:
entreprises.append(entreprise)
logger.info(f" {len(entreprises)} résultat(s) trouvé(s)")
return EntrepriseSearchResponse(
total_results=len(entreprises), results=entreprises, query=q
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Erreur recherche entreprise: {e}", exc_info=True)
raise HTTPException(
status_code=500, detail=f"Erreur lors de la recherche: {str(e)}"
)
@router.get("/siren/{siren}", response_model=EntrepriseSearch)
async def lire_entreprise_par_siren(
siren: str = Query(
...,
min_length=9,
max_length=9,
regex=r"^\d{9}$",
description="Numéro SIREN (9 chiffres)",
),
):
try:
logger.info(f"Lecture entreprise SIREN: {siren}")
api_response = await rechercher_entreprise_api(siren, per_page=1)
resultats = api_response.get("results", [])
if not resultats:
raise HTTPException(
status_code=404,
detail=f"Aucune entreprise trouvée pour le SIREN {siren}",
)
entreprise_data = resultats[0]
if entreprise_data.get("siren") != siren:
raise HTTPException(status_code=404, detail=f"SIREN {siren} introuvable")
entreprise = mapper_resultat_api(entreprise_data)
if not entreprise:
raise HTTPException(
status_code=500,
detail="Erreur lors du traitement des données entreprise",
)
if not entreprise.is_active:
logger.warning(f" Entreprise CESSÉE: {siren}")
return entreprise
except HTTPException:
raise
except Exception as e:
logger.error(f"Erreur lecture SIREN {siren}: {e}", exc_info=True)
raise HTTPException(
status_code=500, detail=f"Erreur lors de la récupération: {str(e)}"
)
@router.get("/tva/{siren}")
async def calculer_tva(
siren: str = Query(
...,
min_length=9,
max_length=9,
regex=r"^\d{9}$",
description="Numéro SIREN (9 chiffres)",
),
):
tva_number = calculer_tva_intracommunautaire(siren)
if not tva_number:
raise HTTPException(status_code=400, detail=f"SIREN invalide: {siren}")
return {
"siren": siren,
"vat_number": tva_number,
"format": "FR + Clé (2 chiffres) + SIREN (9 chiffres)",
}
@router.get("/health")
async def health_check_api_sirene():
try:
async with httpx.AsyncClient(timeout=5.0) as client:
response = await client.get(
"https://recherche-entreprises.api.gouv.fr/search",
params={"q": "test", "per_page": 1},
)
if response.status_code == 200:
return {
"status": "healthy",
"api_sirene": "disponible",
"response_time_ms": response.elapsed.total_seconds() * 1000,
"timestamp": datetime.now().isoformat(),
}
else:
return {
"status": "degraded",
"api_sirene": f"statut {response.status_code}",
"timestamp": datetime.now().isoformat(),
}
except Exception as e:
logger.error(f"Health check failed: {e}")
return {
"status": "unhealthy",
"api_sirene": "indisponible",
"error": str(e),
"timestamp": datetime.now().isoformat(),
}

View file

@ -61,6 +61,8 @@ from schemas.sage.sage_gateway import (
from schemas.society.societe import SocieteInfo
from schemas.society.enterprise import EntrepriseSearch, EntrepriseSearchResponse
__all__ = [
"TiersDetails",
"TypeTiers",
@ -116,5 +118,7 @@ __all__ = [
"SyncStatsResponse",
"CreateSignatureRequest",
"TransactionResponse",
"SocieteInfo"
"SocieteInfo",
"EntrepriseSearch",
"EntrepriseSearchResponse",
]

View file

@ -0,0 +1,24 @@
from pydantic import BaseModel, Field
from typing import Optional, List
class EntrepriseSearch(BaseModel):
"""Modèle de réponse pour une entreprise trouvée"""
company_name: str = Field(..., description="Raison sociale complète")
siren: str = Field(..., description="Numéro SIREN (9 chiffres)")
vat_number: str = Field(..., description="Numéro de TVA intracommunautaire")
address: str = Field(..., description="Adresse complète du siège")
naf_code: str = Field(..., description="Code NAF/APE")
is_active: bool = Field(..., description="True si entreprise active")
siret_siege: Optional[str] = Field(None, description="SIRET du siège")
code_postal: Optional[str] = None
ville: Optional[str] = None
class EntrepriseSearchResponse(BaseModel):
"""Réponse globale de la recherche"""
total_results: int
results: List[EntrepriseSearch]
query: str

136
utils/enterprise.py Normal file
View file

@ -0,0 +1,136 @@
from fastapi import HTTPException
from typing import Optional
import httpx
import logging
from schemas import EntrepriseSearch
logger = logging.getLogger(__name__)
def calculer_tva_intracommunautaire(siren: str) -> Optional[str]:
try:
siren_clean = siren.replace(" ", "").strip()
if not siren_clean.isdigit() or len(siren_clean) != 9:
logger.warning(f"SIREN invalide: {siren}")
return None
siren_int = int(siren_clean)
cle = (12 + 3 * (siren_int % 97)) % 97
cle_str = f"{cle:02d}"
return f"FR{cle_str}{siren_clean}"
except Exception as e:
logger.error(f"Erreur calcul TVA pour SIREN {siren}: {e}")
return None
def formater_adresse(siege_data: dict) -> str:
try:
adresse_parts = []
if siege_data.get("numero_voie"):
adresse_parts.append(siege_data["numero_voie"])
if siege_data.get("type_voie"):
adresse_parts.append(siege_data["type_voie"])
if siege_data.get("libelle_voie"):
adresse_parts.append(siege_data["libelle_voie"])
if siege_data.get("code_postal"):
adresse_parts.append(siege_data["code_postal"])
if siege_data.get("libelle_commune"):
adresse_parts.append(siege_data["libelle_commune"].upper())
return " ".join(adresse_parts)
except Exception as e:
logger.error(f"Erreur formatage adresse: {e}")
return ""
async def rechercher_entreprise_api(query: str, per_page: int = 5) -> dict:
api_url = "https://recherche-entreprises.api.gouv.fr/search"
params = {
"q": query,
"per_page": per_page,
"limite_etablissements": 5,
}
try:
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.get(api_url, params=params)
if response.status_code == 429:
logger.warning("Rate limit atteint (7 req/s)")
raise HTTPException(
status_code=429,
detail="Trop de requêtes. Veuillez réessayer dans 1 seconde.",
)
if response.status_code == 503:
logger.error("API Sirene indisponible (503)")
raise HTTPException(
status_code=503,
detail="Service de recherche momentanément indisponible.",
)
response.raise_for_status()
return response.json()
except httpx.TimeoutException:
logger.error(f"Timeout lors de la recherche: {query}")
raise HTTPException(
status_code=504, detail="Délai d'attente dépassé pour l'API de recherche."
)
except httpx.HTTPError as e:
logger.error(f"Erreur HTTP API Sirene: {e}")
raise HTTPException(
status_code=500,
detail=f"Erreur lors de la communication avec l'API: {str(e)}",
)
def mapper_resultat_api(entreprise_data: dict) -> Optional[EntrepriseSearch]:
try:
siren = entreprise_data.get("siren")
if not siren:
logger.warning("Entreprise sans SIREN, ignorée")
return None
tva_number = calculer_tva_intracommunautaire(siren)
if not tva_number:
logger.warning(f"Impossible de calculer TVA pour SIREN: {siren}")
return None
siege = entreprise_data.get("siege", {})
etat_admin = entreprise_data.get("etat_administratif", "A")
is_active = etat_admin == "A"
return EntrepriseSearch(
company_name=entreprise_data.get("nom_complet", ""),
siren=siren,
vat_number=tva_number,
address=formater_adresse(siege),
naf_code=entreprise_data.get("activite_principale", ""),
is_active=is_active,
siret_siege=siege.get("siret"),
code_postal=siege.get("code_postal"),
ville=siege.get("libelle_commune"),
)
except Exception as e:
logger.error(f"Erreur mapping entreprise: {e}", exc_info=True)
return None

View file

@ -423,7 +423,7 @@ STATUS_MESSAGES: Dict[str, Dict[str, str]] = {
"SIGNE": {
"fr": "Signé avec succès",
"en": "Successfully signed",
"icon": "",
"icon": "",
"color": "green",
},
"REFUSE": {
@ -441,7 +441,7 @@ STATUS_MESSAGES: Dict[str, Dict[str, str]] = {
"ERREUR": {
"fr": "Erreur technique",
"en": "Technical error",
"icon": "⚠️",
"icon": "",
"color": "red",
},
}

View file

@ -90,7 +90,7 @@ STATUS_MESSAGES: Dict[str, Dict[str, str]] = {
"SIGNE": {
"fr": "Signé avec succès",
"en": "Successfully signed",
"icon": "",
"icon": "",
"color": "green",
},
"REFUSE": {
@ -108,7 +108,7 @@ STATUS_MESSAGES: Dict[str, Dict[str, str]] = {
"ERREUR": {
"fr": "Erreur technique",
"en": "Technical error",
"icon": "⚠️",
"icon": "",
"color": "red",
},
}