Sage100-vps/services/universign_document.py
2026-01-20 06:31:17 +03:00

361 lines
13 KiB
Python

import os
import logging
import requests
from pathlib import Path
from datetime import datetime
from typing import Optional, Tuple, Dict, List
from sqlalchemy.ext.asyncio import AsyncSession
logger = logging.getLogger(__name__)
SIGNED_DOCS_DIR = Path(os.getenv("SIGNED_DOCS_PATH", "/app/data/signed_documents"))
SIGNED_DOCS_DIR.mkdir(parents=True, exist_ok=True)
class UniversignDocumentService:
"""Service de gestion des documents signés Universign - VERSION CORRIGÉE"""
def __init__(self, api_url: str, api_key: str, timeout: int = 60):
self.api_url = api_url.rstrip("/")
self.api_key = api_key
self.timeout = timeout
self.auth = (api_key, "")
def fetch_transaction_documents(self, transaction_id: str) -> Optional[List[Dict]]:
try:
logger.info(f" Récupération documents pour transaction: {transaction_id}")
response = requests.get(
f"{self.api_url}/transactions/{transaction_id}",
auth=self.auth,
timeout=self.timeout,
headers={"Accept": "application/json"},
)
if response.status_code == 200:
data = response.json()
documents = data.get("documents", [])
logger.info(f"{len(documents)} document(s) trouvé(s)")
for idx, doc in enumerate(documents):
logger.debug(
f" Document {idx}: id={doc.get('id')}, "
f"name={doc.get('name')}, status={doc.get('status')}"
)
return documents
elif response.status_code == 404:
logger.warning(
f"Transaction {transaction_id} introuvable sur Universign"
)
return None
else:
logger.error(
f"Erreur HTTP {response.status_code} pour {transaction_id}: "
f"{response.text[:500]}"
)
return None
except requests.exceptions.Timeout:
logger.error(f"⏱️ Timeout récupération transaction {transaction_id}")
return None
except Exception as e:
logger.error(f" Erreur fetch documents: {e}", exc_info=True)
return None
def download_signed_document(
self, transaction_id: str, document_id: str
) -> Optional[bytes]:
try:
download_url = (
f"{self.api_url}/transactions/{transaction_id}"
f"/documents/{document_id}/download"
)
logger.info(f"Téléchargement depuis: {download_url}")
response = requests.get(
download_url,
auth=self.auth,
timeout=self.timeout,
stream=True,
)
if response.status_code == 200:
content_type = response.headers.get("Content-Type", "")
content_length = response.headers.get("Content-Length", "unknown")
logger.info(
f"Téléchargement réussi: "
f"Content-Type={content_type}, Size={content_length}"
)
if (
"pdf" not in content_type.lower()
and "octet-stream" not in content_type.lower()
):
logger.warning(
f"Type de contenu inattendu: {content_type}. "
f"Tentative de lecture quand même..."
)
content = response.content
if len(content) < 1024:
logger.error(f" Document trop petit: {len(content)} octets")
return None
return content
elif response.status_code == 404:
logger.error(
f" Document {document_id} introuvable pour transaction {transaction_id}"
)
return None
elif response.status_code == 403:
logger.error(
f" Accès refusé au document {document_id}. "
f"Vérifiez que la transaction est bien signée."
)
return None
else:
logger.error(
f" Erreur HTTP {response.status_code}: {response.text[:500]}"
)
return None
except requests.exceptions.Timeout:
logger.error(f"⏱️ Timeout téléchargement document {document_id}")
return None
except Exception as e:
logger.error(f" Erreur téléchargement: {e}", exc_info=True)
return None
async def download_and_store_signed_document(
self, session: AsyncSession, transaction, force: bool = False
) -> Tuple[bool, Optional[str]]:
if not force and transaction.signed_document_path:
if os.path.exists(transaction.signed_document_path):
logger.debug(
f"Document déjà téléchargé: {transaction.transaction_id}"
)
return True, None
transaction.download_attempts += 1
try:
logger.info(
f"Récupération document signé pour: {transaction.transaction_id}"
)
documents = self.fetch_transaction_documents(transaction.transaction_id)
if not documents:
error = "Aucun document trouvé dans la transaction Universign"
logger.warning(f"{error}")
transaction.download_error = error
await session.commit()
return False, error
document_id = None
for doc in documents:
doc_id = doc.get("id")
doc_status = doc.get("status", "").lower()
if doc_status in ["signed", "completed", "closed"]:
document_id = doc_id
logger.info(
f"Document signé trouvé: {doc_id} (status: {doc_status})"
)
break
if document_id is None:
document_id = doc_id
if not document_id:
error = "Impossible de déterminer l'ID du document à télécharger"
logger.error(f" {error}")
transaction.download_error = error
await session.commit()
return False, error
if hasattr(transaction, "universign_document_id"):
transaction.universign_document_id = document_id
pdf_content = self.download_signed_document(
transaction_id=transaction.transaction_id, document_id=document_id
)
if not pdf_content:
error = f"Échec téléchargement document {document_id}"
logger.error(f" {error}")
transaction.download_error = error
await session.commit()
return False, error
filename = self._generate_filename(transaction)
file_path = SIGNED_DOCS_DIR / filename
with open(file_path, "wb") as f:
f.write(pdf_content)
file_size = os.path.getsize(file_path)
transaction.signed_document_path = str(file_path)
transaction.signed_document_downloaded_at = datetime.now()
transaction.signed_document_size_bytes = file_size
transaction.download_error = None
transaction.document_url = (
f"{self.api_url}/transactions/{transaction.transaction_id}"
f"/documents/{document_id}/download"
)
await session.commit()
logger.info(
f"Document signé téléchargé: {filename} ({file_size / 1024:.1f} KB)"
)
return True, None
except OSError as e:
error = f"Erreur filesystem: {str(e)}"
logger.error(f" {error}")
transaction.download_error = error
await session.commit()
return False, error
except Exception as e:
error = f"Erreur inattendue: {str(e)}"
logger.error(f" {error}", exc_info=True)
transaction.download_error = error
await session.commit()
return False, error
def _generate_filename(self, transaction) -> str:
"""Génère un nom de fichier unique pour le document signé"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
tx_id = transaction.transaction_id.replace("tr_", "")
filename = f"{transaction.sage_document_id}_{tx_id}_{timestamp}_signed.pdf"
return filename
def get_document_path(self, transaction) -> Optional[Path]:
"""Retourne le chemin du document signé s'il existe"""
if not transaction.signed_document_path:
return None
path = Path(transaction.signed_document_path)
if path.exists():
return path
return None
async def cleanup_old_documents(self, days_to_keep: int = 90) -> Tuple[int, int]:
"""Supprime les anciens documents signés"""
from datetime import timedelta
cutoff_date = datetime.now() - timedelta(days=days_to_keep)
deleted = 0
size_freed = 0
for file_path in SIGNED_DOCS_DIR.glob("*.pdf"):
try:
file_time = datetime.fromtimestamp(os.path.getmtime(file_path))
if file_time < cutoff_date:
size_freed += os.path.getsize(file_path)
os.remove(file_path)
deleted += 1
logger.info(f"🗑️ Supprimé: {file_path.name}")
except Exception as e:
logger.error(f"Erreur suppression {file_path}: {e}")
size_freed_mb = size_freed / (1024 * 1024)
logger.info(
f"Nettoyage terminé: {deleted} fichiers supprimés "
f"({size_freed_mb:.2f} MB libérés)"
)
return deleted, int(size_freed_mb)
def diagnose_transaction(self, transaction_id: str) -> Dict:
"""
Diagnostic complet d'une transaction pour debug
"""
result = {
"transaction_id": transaction_id,
"api_url": self.api_url,
"timestamp": datetime.now().isoformat(),
"checks": {},
}
try:
logger.info(f"Diagnostic transaction: {transaction_id}")
response = requests.get(
f"{self.api_url}/transactions/{transaction_id}",
auth=self.auth,
timeout=self.timeout,
)
result["checks"]["transaction_fetch"] = {
"status_code": response.status_code,
"success": response.status_code == 200,
}
if response.status_code != 200:
result["checks"]["transaction_fetch"]["error"] = response.text[:500]
return result
data = response.json()
result["checks"]["transaction_data"] = {
"state": data.get("state"),
"documents_count": len(data.get("documents", [])),
"participants_count": len(data.get("participants", [])),
}
documents = data.get("documents", [])
result["checks"]["documents"] = []
for doc in documents:
doc_info = {
"id": doc.get("id"),
"name": doc.get("name"),
"status": doc.get("status"),
}
if doc.get("id"):
download_url = (
f"{self.api_url}/transactions/{transaction_id}"
f"/documents/{doc['id']}/download"
)
try:
dl_response = requests.head(
download_url,
auth=self.auth,
timeout=10,
)
doc_info["download_check"] = {
"url": download_url,
"status_code": dl_response.status_code,
"accessible": dl_response.status_code in [200, 302],
"content_type": dl_response.headers.get("Content-Type"),
}
except Exception as e:
doc_info["download_check"] = {"error": str(e)}
result["checks"]["documents"].append(doc_info)
result["success"] = True
except Exception as e:
result["success"] = False
result["error"] = str(e)
return result