136 lines
4 KiB
Python
136 lines
4 KiB
Python
from fastapi import HTTPException
|
|
from typing import Optional
|
|
import httpx
|
|
import logging
|
|
|
|
from schemas import EntrepriseSearch
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def calculer_tva_intracommunautaire(siren: str) -> Optional[str]:
|
|
try:
|
|
siren_clean = siren.replace(" ", "").strip()
|
|
|
|
if not siren_clean.isdigit() or len(siren_clean) != 9:
|
|
logger.warning(f"SIREN invalide: {siren}")
|
|
return None
|
|
|
|
siren_int = int(siren_clean)
|
|
|
|
cle = (12 + 3 * (siren_int % 97)) % 97
|
|
|
|
cle_str = f"{cle:02d}"
|
|
|
|
return f"FR{cle_str}{siren_clean}"
|
|
|
|
except Exception as e:
|
|
logger.error(f"Erreur calcul TVA pour SIREN {siren}: {e}")
|
|
return None
|
|
|
|
|
|
def formater_adresse(siege_data: dict) -> str:
|
|
try:
|
|
adresse_parts = []
|
|
|
|
if siege_data.get("numero_voie"):
|
|
adresse_parts.append(siege_data["numero_voie"])
|
|
|
|
if siege_data.get("type_voie"):
|
|
adresse_parts.append(siege_data["type_voie"])
|
|
|
|
if siege_data.get("libelle_voie"):
|
|
adresse_parts.append(siege_data["libelle_voie"])
|
|
|
|
if siege_data.get("code_postal"):
|
|
adresse_parts.append(siege_data["code_postal"])
|
|
|
|
if siege_data.get("libelle_commune"):
|
|
adresse_parts.append(siege_data["libelle_commune"].upper())
|
|
|
|
return " ".join(adresse_parts)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Erreur formatage adresse: {e}")
|
|
return ""
|
|
|
|
|
|
async def rechercher_entreprise_api(query: str, per_page: int = 5) -> dict:
|
|
api_url = "https://recherche-entreprises.api.gouv.fr/search"
|
|
|
|
params = {
|
|
"q": query,
|
|
"per_page": per_page,
|
|
"limite_etablissements": 5,
|
|
}
|
|
|
|
try:
|
|
async with httpx.AsyncClient(timeout=10.0) as client:
|
|
response = await client.get(api_url, params=params)
|
|
|
|
if response.status_code == 429:
|
|
logger.warning("Rate limit atteint (7 req/s)")
|
|
raise HTTPException(
|
|
status_code=429,
|
|
detail="Trop de requêtes. Veuillez réessayer dans 1 seconde.",
|
|
)
|
|
|
|
if response.status_code == 503:
|
|
logger.error("API Sirene indisponible (503)")
|
|
raise HTTPException(
|
|
status_code=503,
|
|
detail="Service de recherche momentanément indisponible.",
|
|
)
|
|
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
except httpx.TimeoutException:
|
|
logger.error(f"Timeout lors de la recherche: {query}")
|
|
raise HTTPException(
|
|
status_code=504, detail="Délai d'attente dépassé pour l'API de recherche."
|
|
)
|
|
|
|
except httpx.HTTPError as e:
|
|
logger.error(f"Erreur HTTP API Sirene: {e}")
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Erreur lors de la communication avec l'API: {str(e)}",
|
|
)
|
|
|
|
|
|
def mapper_resultat_api(entreprise_data: dict) -> Optional[EntrepriseSearch]:
|
|
try:
|
|
siren = entreprise_data.get("siren")
|
|
|
|
if not siren:
|
|
logger.warning("Entreprise sans SIREN, ignorée")
|
|
return None
|
|
|
|
tva_number = calculer_tva_intracommunautaire(siren)
|
|
|
|
if not tva_number:
|
|
logger.warning(f"Impossible de calculer TVA pour SIREN: {siren}")
|
|
return None
|
|
|
|
siege = entreprise_data.get("siege", {})
|
|
|
|
etat_admin = entreprise_data.get("etat_administratif", "A")
|
|
is_active = etat_admin == "A"
|
|
|
|
return EntrepriseSearch(
|
|
company_name=entreprise_data.get("nom_complet", ""),
|
|
siren=siren,
|
|
vat_number=tva_number,
|
|
address=formater_adresse(siege),
|
|
naf_code=entreprise_data.get("activite_principale", ""),
|
|
is_active=is_active,
|
|
siret_siege=siege.get("siret"),
|
|
code_postal=siege.get("code_postal"),
|
|
ville=siege.get("libelle_commune"),
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Erreur mapping entreprise: {e}", exc_info=True)
|
|
return None
|