import os import logging import requests from pathlib import Path from datetime import datetime from typing import Optional, Tuple from sqlalchemy.ext.asyncio import AsyncSession logger = logging.getLogger(__name__) SIGNED_DOCS_DIR = Path(os.getenv("SIGNED_DOCS_PATH", "/app/data/signed_documents")) SIGNED_DOCS_DIR.mkdir(parents=True, exist_ok=True) class UniversignDocumentService: """Service de gestion des documents signés Universign""" def __init__(self, api_key: str, timeout: int = 60): self.api_key = api_key self.timeout = timeout self.auth = (api_key, "") async def download_and_store_signed_document( self, session: AsyncSession, transaction, force: bool = False ) -> Tuple[bool, Optional[str]]: if not force and transaction.signed_document_path: if os.path.exists(transaction.signed_document_path): logger.debug(f"Document déjà téléchargé : {transaction.transaction_id}") return True, None if not transaction.document_url: error = "Aucune URL de document disponible" logger.warning(f"{error} pour {transaction.transaction_id}") transaction.download_error = error await session.commit() return False, error try: logger.info(f"Téléchargement document signé : {transaction.transaction_id}") transaction.download_attempts += 1 response = requests.get( transaction.document_url, auth=self.auth, timeout=self.timeout, stream=True, ) response.raise_for_status() content_type = response.headers.get("Content-Type", "") if "pdf" not in content_type.lower(): error = f"Type de contenu invalide : {content_type}" logger.error(error) transaction.download_error = error await session.commit() return False, error filename = self._generate_filename(transaction) file_path = SIGNED_DOCS_DIR / filename with open(file_path, "wb") as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) file_size = os.path.getsize(file_path) if file_size < 1024: # Moins de 1 KB = suspect error = f"Fichier trop petit : {file_size} octets" logger.error(error) os.remove(file_path) transaction.download_error = error await session.commit() return False, error transaction.signed_document_path = str(file_path) transaction.signed_document_downloaded_at = datetime.now() transaction.signed_document_size_bytes = file_size transaction.download_error = None await session.commit() logger.info(f"Document téléchargé : {filename} ({file_size / 1024:.1f} KB)") return True, None except requests.exceptions.RequestException as e: error = f"Erreur HTTP : {str(e)}" logger.error(f"{error} pour {transaction.transaction_id}") transaction.download_error = error await session.commit() return False, error except OSError as e: error = f"Erreur filesystem : {str(e)}" logger.error(f"{error}") transaction.download_error = error await session.commit() return False, error except Exception as e: error = f"Erreur inattendue : {str(e)}" logger.error(f"{error}", exc_info=True) transaction.download_error = error await session.commit() return False, error def _generate_filename(self, transaction) -> str: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") tx_id = transaction.transaction_id.replace("tr_", "") filename = f"{transaction.sage_document_id}_{tx_id}_{timestamp}.pdf" return filename def get_document_path(self, transaction) -> Optional[Path]: if not transaction.signed_document_path: return None path = Path(transaction.signed_document_path) if path.exists(): return path return None async def cleanup_old_documents(self, days_to_keep: int = 90) -> Tuple[int, int]: from datetime import timedelta cutoff_date = datetime.now() - timedelta(days=days_to_keep) deleted = 0 size_freed = 0 for file_path in SIGNED_DOCS_DIR.glob("*.pdf"): try: file_time = datetime.fromtimestamp(os.path.getmtime(file_path)) if file_time < cutoff_date: size_freed += os.path.getsize(file_path) os.remove(file_path) deleted += 1 logger.info(f"🗑️ Supprimé : {file_path.name}") except Exception as e: logger.error(f"Erreur suppression {file_path}: {e}") size_freed_mb = size_freed / (1024 * 1024) logger.info( f"Nettoyage terminé : {deleted} fichiers supprimés " f"({size_freed_mb:.2f} MB libérés)" ) return deleted, int(size_freed_mb)