214 lines
5.3 KiB
Python
214 lines
5.3 KiB
Python
from typing import Optional
|
|
from datetime import datetime, date
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def _clean_str(value, max_len: int) -> str:
|
|
"""Nettoie et tronque une chaîne"""
|
|
if value is None or str(value).lower() in ("none", "null", ""):
|
|
return ""
|
|
return str(value)[:max_len].strip()
|
|
|
|
|
|
def _safe_strip(value) -> Optional[str]:
|
|
"""Nettoie une valeur string en toute sécurité"""
|
|
if value is None:
|
|
return None
|
|
if isinstance(value, str):
|
|
stripped = value.strip()
|
|
return stripped if stripped else None
|
|
return str(value).strip() or None
|
|
|
|
|
|
def _safe_int(value, default=None):
|
|
"""Conversion sécurisée en entier"""
|
|
if value is None:
|
|
return default
|
|
try:
|
|
return int(value)
|
|
except (ValueError, TypeError):
|
|
return default
|
|
|
|
|
|
def _try_set_attribute(obj, attr_name, value, variants=None):
|
|
"""Essaie de définir un attribut avec plusieurs variantes"""
|
|
if variants is None:
|
|
variants = [attr_name]
|
|
else:
|
|
variants = [attr_name] + variants
|
|
|
|
for variant in variants:
|
|
try:
|
|
if hasattr(obj, variant):
|
|
setattr(obj, variant, value)
|
|
return True
|
|
except Exception as e:
|
|
logger.debug(f" {variant} échec: {str(e)[:50]}")
|
|
|
|
return False
|
|
|
|
|
|
def _get_type_libelle(type_doc: int) -> str:
|
|
types_officiels = {
|
|
0: "Devis",
|
|
10: "Bon de commande",
|
|
20: "Préparation",
|
|
30: "Bon de livraison",
|
|
40: "Bon de retour",
|
|
50: "Bon d'avoir",
|
|
60: "Facture",
|
|
}
|
|
|
|
types_alternatifs = {
|
|
1: "Bon de commande",
|
|
2: "Préparation",
|
|
3: "Bon de livraison",
|
|
4: "Bon de retour",
|
|
5: "Bon d'avoir",
|
|
6: "Facture",
|
|
}
|
|
|
|
if type_doc in types_officiels:
|
|
return types_officiels[type_doc]
|
|
|
|
if type_doc in types_alternatifs:
|
|
return types_alternatifs[type_doc]
|
|
|
|
return f"Type {type_doc}"
|
|
|
|
|
|
def _convertir_type_pour_sql(type_doc: int) -> int:
|
|
"""COM → SQL : 0, 10, 20, 30... → 0, 1, 2, 3..."""
|
|
mapping = {0: 0, 10: 1, 20: 2, 30: 3, 40: 4, 50: 5, 60: 6}
|
|
return mapping.get(type_doc, type_doc)
|
|
|
|
|
|
def _convertir_type_depuis_sql(type_sql: int) -> int:
|
|
"""SQL → COM : 0, 1, 2, 3... → 0, 10, 20, 30..."""
|
|
mapping = {0: 0, 1: 10, 2: 20, 3: 30, 4: 40, 5: 50, 6: 60}
|
|
return mapping.get(type_sql, type_sql)
|
|
|
|
|
|
def _normaliser_type_document(type_doc: int) -> int:
|
|
logger.info(f"[INFO] TYPE RECU{type_doc}")
|
|
|
|
if type_doc in [0, 10, 20, 30, 40, 50, 60]:
|
|
return type_doc
|
|
|
|
mapping_normalisation = {
|
|
1: 10,
|
|
2: 20,
|
|
3: 30,
|
|
4: 40,
|
|
5: 50,
|
|
6: 60,
|
|
}
|
|
|
|
return mapping_normalisation.get(type_doc, type_doc)
|
|
|
|
|
|
def normaliser_date(valeur):
|
|
"""Parse flexible des dates - supporte ISO, datetime, YYYY-MM-DD HH:MM:SS"""
|
|
if valeur is None:
|
|
return datetime.now()
|
|
|
|
if isinstance(valeur, datetime):
|
|
return valeur
|
|
|
|
if isinstance(valeur, date):
|
|
return datetime.combine(valeur, datetime.min.time())
|
|
|
|
if isinstance(valeur, str):
|
|
valeur = valeur.strip()
|
|
|
|
if not valeur:
|
|
return datetime.now()
|
|
|
|
if valeur.endswith("Z"):
|
|
valeur = valeur[:-1] + "+00:00"
|
|
|
|
formats = [
|
|
"%Y-%m-%dT%H:%M:%S.%f%z",
|
|
"%Y-%m-%dT%H:%M:%S%z",
|
|
"%Y-%m-%dT%H:%M:%S.%f",
|
|
"%Y-%m-%dT%H:%M:%S",
|
|
"%Y-%m-%d %H:%M:%S.%f",
|
|
"%Y-%m-%d %H:%M:%S",
|
|
"%Y-%m-%d %H:%M",
|
|
"%Y-%m-%d",
|
|
"%d/%m/%Y %H:%M:%S",
|
|
"%d/%m/%Y %H:%M",
|
|
"%d/%m/%Y",
|
|
]
|
|
|
|
for fmt in formats:
|
|
try:
|
|
dt = datetime.strptime(valeur, fmt)
|
|
return dt.replace(tzinfo=None) if dt.tzinfo else dt
|
|
except ValueError:
|
|
continue
|
|
|
|
try:
|
|
if "+" in valeur:
|
|
valeur = valeur.split("+")[0]
|
|
dt = datetime.fromisoformat(valeur)
|
|
return dt.replace(tzinfo=None) if dt.tzinfo else dt
|
|
except ValueError:
|
|
pass
|
|
|
|
return datetime.now()
|
|
|
|
|
|
def _parser_heure_sage(do_heure) -> str:
|
|
"""Parse DO_Heure format Sage (HHMMSS stocké en entier)"""
|
|
if not do_heure:
|
|
return "00:00:00"
|
|
|
|
try:
|
|
heure_int = int(str(do_heure).strip())
|
|
|
|
heure_str = str(heure_int).zfill(6)
|
|
|
|
hh = int(heure_str[0:2])
|
|
mm = int(heure_str[2:4])
|
|
ss = int(heure_str[4:6])
|
|
|
|
if 0 <= hh <= 23 and 0 <= mm <= 59 and 0 <= ss <= 59:
|
|
return f"{hh:02d}:{mm:02d}:{ss:02d}"
|
|
except (ValueError, TypeError):
|
|
pass
|
|
|
|
return "00:00:00"
|
|
|
|
|
|
def _combiner_date_heure(do_date, do_heure) -> str:
|
|
"""Combine DO_Date et DO_Heure en datetime string"""
|
|
if not do_date:
|
|
return ""
|
|
|
|
try:
|
|
date_str = (
|
|
do_date.strftime("%Y-%m-%d")
|
|
if hasattr(do_date, "strftime")
|
|
else str(do_date)[:10]
|
|
)
|
|
heure_str = _parser_heure_sage(do_heure)
|
|
return f"{date_str} {heure_str}"
|
|
except Exception:
|
|
return str(do_date) if do_date else ""
|
|
|
|
|
|
__all__ = [
|
|
"_clean_str",
|
|
"_safe_strip",
|
|
"_safe_int",
|
|
"_try_set_attribute",
|
|
"_get_type_libelle",
|
|
"_normaliser_type_document",
|
|
"_convertir_type_depuis_sql",
|
|
"_convertir_type_pour_sql",
|
|
"normaliser_date",
|
|
"_combiner_date_heure",
|
|
]
|