Sage100-vps/services/signed_documents.py

188 lines
6.6 KiB
Python

import os
import requests
import hashlib
import logging
from pathlib import Path
from typing import Optional, Tuple
from datetime import datetime
from sqlalchemy.ext.asyncio import AsyncSession
from database import UniversignTransaction
logger = logging.getLogger(__name__)
class SignedDocuments:
"""Service de gestion des documents signés"""
def __init__(self, storage_path: str = "./data/signed_documents"):
self.storage_path = Path(storage_path)
self.storage_path.mkdir(parents=True, exist_ok=True)
# Créer des sous-répertoires par type de document
for doc_type in ["devis", "commandes", "factures", "livraisons", "avoirs"]:
(self.storage_path / doc_type).mkdir(exist_ok=True)
def _get_storage_subdir(self, sage_doc_type: int) -> str:
"""Retourne le sous-répertoire selon le type de document Sage"""
mapping = {
0: "devis",
10: "commandes",
30: "livraisons",
50: "avoirs",
60: "factures",
}
return mapping.get(sage_doc_type, "autres")
def _generate_filename(
self, transaction_id: str, sage_doc_id: str, sage_doc_type: int
) -> str:
"""Génère un nom de fichier unique et sécurisé"""
# Hash du transaction_id pour éviter les collisions
hash_suffix = hashlib.md5(transaction_id.encode()).hexdigest()[:8]
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
return f"{sage_doc_id}_{timestamp}_{hash_suffix}_signed.pdf"
async def download_and_store(
self,
session: AsyncSession,
transaction: UniversignTransaction,
document_url: str,
api_key: str,
) -> Tuple[bool, Optional[str], Optional[str]]:
"""
Télécharge et stocke le document signé
Returns:
(success, file_path, error_message)
"""
try:
# Télécharger le document depuis Universign
logger.info(f"Téléchargement document signé: {transaction.transaction_id}")
response = requests.get(
document_url, auth=(api_key, ""), timeout=60, stream=True
)
if response.status_code != 200:
error = f"Erreur HTTP {response.status_code} lors du téléchargement"
logger.error(error)
return False, None, error
# Vérifier que c'est bien un PDF
content_type = response.headers.get("Content-Type", "")
if "pdf" not in content_type.lower():
error = f"Type de contenu invalide: {content_type}"
logger.warning(error)
# Générer le nom de fichier et le chemin
subdir = self._get_storage_subdir(transaction.sage_document_type.value)
filename = self._generate_filename(
transaction.transaction_id,
transaction.sage_document_id,
transaction.sage_document_type.value,
)
file_path = self.storage_path / subdir / filename
# Écrire le fichier
with open(file_path, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
file_size = file_path.stat().st_size
logger.info(f"Document stocké: {file_path} ({file_size} octets)")
# Mettre à jour la transaction
transaction.signed_document_path = str(file_path)
transaction.signed_document_downloaded_at = datetime.now()
await session.commit()
return True, str(file_path), None
except requests.exceptions.RequestException as e:
error = f"Erreur réseau lors du téléchargement: {str(e)}"
logger.error(error, exc_info=True)
return False, None, error
except IOError as e:
error = f"Erreur d'écriture du fichier: {str(e)}"
logger.error(error, exc_info=True)
return False, None, error
except Exception as e:
error = f"Erreur inattendue: {str(e)}"
logger.error(error, exc_info=True)
return False, None, error
def get_document_path(self, transaction: UniversignTransaction) -> Optional[Path]:
"""Retourne le chemin du document signé s'il existe"""
if not transaction.signed_document_path:
return None
path = Path(transaction.signed_document_path)
if not path.exists():
logger.warning(f"Document signé introuvable: {path}")
return None
return path
def verify_document_integrity(self, file_path: Path) -> bool:
"""Vérifie l'intégrité basique du document (taille, extension)"""
try:
if not file_path.exists():
return False
# Vérifier que le fichier n'est pas vide
if file_path.stat().st_size == 0:
logger.error(f"Document vide: {file_path}")
return False
# Vérifier l'extension
if file_path.suffix.lower() != ".pdf":
logger.error(f"Extension invalide: {file_path}")
return False
# Vérifier les premiers octets (signature PDF)
with open(file_path, "rb") as f:
header = f.read(5)
if header != b"%PDF-":
logger.error(f"Signature PDF invalide: {file_path}")
return False
return True
except Exception as e:
logger.error(f"Erreur vérification intégrité: {e}")
return False
async def cleanup_old_documents(self, days_to_keep: int = 365):
"""Nettoie les documents signés de plus de X jours (archivage)"""
cutoff_date = datetime.now().timestamp() - (days_to_keep * 86400)
deleted_count = 0
try:
for subdir in self.storage_path.iterdir():
if not subdir.is_dir():
continue
for file_path in subdir.glob("*.pdf"):
if file_path.stat().st_mtime < cutoff_date:
logger.info(f"Suppression ancien document: {file_path}")
file_path.unlink()
deleted_count += 1
logger.info(f"Nettoyage terminé: {deleted_count} document(s) supprimé(s)")
return deleted_count
except Exception as e:
logger.error(f"Erreur nettoyage documents: {e}")
return 0
# Instance globale
signed_documents = SignedDocuments(
storage_path=os.getenv("SIGNED_DOCS_PATH", "./data/signed_documents")
)