714 lines
26 KiB
Python
714 lines
26 KiB
Python
import requests
|
||
import json
|
||
import logging
|
||
import uuid
|
||
from typing import Dict, Optional, Tuple
|
||
from datetime import datetime, timedelta
|
||
from sqlalchemy.ext.asyncio import AsyncSession
|
||
from sqlalchemy import select, and_, or_
|
||
from sqlalchemy.orm import selectinload
|
||
|
||
from database import (
|
||
UniversignTransaction,
|
||
UniversignSigner,
|
||
UniversignSyncLog,
|
||
UniversignTransactionStatus,
|
||
LocalDocumentStatus,
|
||
UniversignSignerStatus,
|
||
EmailLog,
|
||
StatutEmail,
|
||
)
|
||
from data.data import templates_signature_email
|
||
from services.universign_document import UniversignDocumentService
|
||
from utils.universign_status_mapping import (
|
||
map_universign_to_local,
|
||
is_transition_allowed,
|
||
get_status_actions,
|
||
is_final_status,
|
||
resolve_status_conflict,
|
||
)
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
class UniversignSyncService:
|
||
def __init__(self, api_url: str, api_key: str, timeout: int = 30):
|
||
self.api_url = api_url.rstrip("/")
|
||
self.api_key = api_key
|
||
self.timeout = timeout
|
||
self.auth = (api_key, "")
|
||
self.sage_client = None
|
||
self.email_queue = None
|
||
self.settings = None
|
||
self.document_service = UniversignDocumentService(
|
||
api_url=api_url, api_key=api_key, timeout=60
|
||
)
|
||
|
||
def configure(self, sage_client, email_queue, settings):
|
||
self.sage_client = sage_client
|
||
self.email_queue = email_queue
|
||
self.settings = settings
|
||
|
||
def fetch_transaction_status(self, transaction_id: str) -> Optional[Dict]:
|
||
start_time = datetime.now()
|
||
|
||
try:
|
||
response = requests.get(
|
||
f"{self.api_url}/transactions/{transaction_id}",
|
||
auth=self.auth,
|
||
timeout=self.timeout,
|
||
headers={"Accept": "application/json"},
|
||
)
|
||
|
||
response_time_ms = int((datetime.now() - start_time).total_seconds() * 1000)
|
||
|
||
if response.status_code == 200:
|
||
data = response.json()
|
||
logger.info(
|
||
f"Fetch OK: {transaction_id} status={data.get('state')} ({response_time_ms}ms)"
|
||
)
|
||
return {
|
||
"transaction": data,
|
||
"http_status": 200,
|
||
"response_time_ms": response_time_ms,
|
||
"fetched_at": datetime.now(),
|
||
}
|
||
|
||
elif response.status_code == 404:
|
||
logger.warning(
|
||
f"Transaction {transaction_id} introuvable sur Universign"
|
||
)
|
||
return None
|
||
|
||
else:
|
||
logger.error(
|
||
f"Erreur HTTP {response.status_code} pour {transaction_id}: {response.text}"
|
||
)
|
||
return None
|
||
|
||
except requests.exceptions.Timeout:
|
||
logger.error(f"Timeout récupération {transaction_id} (>{self.timeout}s)")
|
||
return None
|
||
|
||
except Exception as e:
|
||
logger.error(f"Erreur fetch {transaction_id}: {e}", exc_info=True)
|
||
return None
|
||
|
||
async def sync_all_pending(
|
||
self, session: AsyncSession, max_transactions: int = 50
|
||
) -> Dict[str, int]:
|
||
query = (
|
||
select(UniversignTransaction)
|
||
.options(selectinload(UniversignTransaction.signers))
|
||
.where(
|
||
and_(
|
||
UniversignTransaction.needs_sync,
|
||
or_(
|
||
~UniversignTransaction.local_status.in_(
|
||
[
|
||
LocalDocumentStatus.SIGNED,
|
||
LocalDocumentStatus.REJECTED,
|
||
LocalDocumentStatus.EXPIRED,
|
||
]
|
||
),
|
||
UniversignTransaction.last_synced_at
|
||
< (datetime.now() - timedelta(hours=1)),
|
||
UniversignTransaction.last_synced_at.is_(None),
|
||
),
|
||
)
|
||
)
|
||
.order_by(UniversignTransaction.created_at.asc())
|
||
.limit(max_transactions)
|
||
)
|
||
|
||
result = await session.execute(query)
|
||
transactions = result.scalars().all()
|
||
|
||
stats = {
|
||
"total_found": len(transactions),
|
||
"success": 0,
|
||
"failed": 0,
|
||
"skipped": 0,
|
||
"status_changes": 0,
|
||
}
|
||
|
||
for transaction in transactions:
|
||
try:
|
||
previous_status = transaction.local_status.value
|
||
|
||
success, error = await self.sync_transaction(
|
||
session, transaction, force=False
|
||
)
|
||
|
||
if success:
|
||
stats["success"] += 1
|
||
if transaction.local_status.value != previous_status:
|
||
stats["status_changes"] += 1
|
||
else:
|
||
stats["failed"] += 1
|
||
|
||
except Exception as e:
|
||
logger.error(
|
||
f"Erreur sync {transaction.transaction_id}: {e}", exc_info=True
|
||
)
|
||
stats["failed"] += 1
|
||
|
||
logger.info(
|
||
f"Polling terminé: {stats['success']}/{stats['total_found']} OK, {stats['status_changes']} changements détectés"
|
||
)
|
||
|
||
return stats
|
||
|
||
async def process_webhook(
|
||
self, session: AsyncSession, payload: Dict, transaction_id: str = None
|
||
) -> Tuple[bool, Optional[str]]:
|
||
"""
|
||
Traite un webhook Universign - CORRECTION : meilleure gestion des payloads
|
||
"""
|
||
try:
|
||
if not transaction_id:
|
||
if (
|
||
payload.get("type", "").startswith("transaction.")
|
||
and "payload" in payload
|
||
):
|
||
nested_object = payload.get("payload", {}).get("object", {})
|
||
if nested_object.get("object") == "transaction":
|
||
transaction_id = nested_object.get("id")
|
||
elif payload.get("type", "").startswith("action."):
|
||
transaction_id = (
|
||
payload.get("payload", {})
|
||
.get("object", {})
|
||
.get("transaction_id")
|
||
)
|
||
elif payload.get("object") == "transaction":
|
||
transaction_id = payload.get("id")
|
||
|
||
if not transaction_id:
|
||
return False, "Transaction ID manquant"
|
||
|
||
event_type = payload.get("type", "webhook")
|
||
|
||
logger.info(
|
||
f"📨 Traitement webhook: transaction={transaction_id}, event={event_type}"
|
||
)
|
||
|
||
query = (
|
||
select(UniversignTransaction)
|
||
.options(selectinload(UniversignTransaction.signers))
|
||
.where(UniversignTransaction.transaction_id == transaction_id)
|
||
)
|
||
result = await session.execute(query)
|
||
transaction = result.scalar_one_or_none()
|
||
|
||
if not transaction:
|
||
logger.warning(f"Transaction {transaction_id} inconnue localement")
|
||
return False, "Transaction inconnue"
|
||
|
||
transaction.webhook_received = True
|
||
|
||
old_status = transaction.local_status.value
|
||
|
||
success, error = await self.sync_transaction(
|
||
session, transaction, force=True
|
||
)
|
||
|
||
if success and transaction.local_status.value != old_status:
|
||
logger.info(
|
||
f"Webhook traité: {transaction_id} | "
|
||
f"{old_status} → {transaction.local_status.value}"
|
||
)
|
||
|
||
await self._log_sync_attempt(
|
||
session=session,
|
||
transaction=transaction,
|
||
sync_type=f"webhook:{event_type}",
|
||
success=success,
|
||
error_message=error,
|
||
previous_status=old_status,
|
||
new_status=transaction.local_status.value,
|
||
changes=json.dumps(
|
||
payload, default=str
|
||
), # Ajout default=str pour éviter les erreurs JSON
|
||
)
|
||
|
||
await session.commit()
|
||
|
||
return success, error
|
||
|
||
except Exception as e:
|
||
logger.error(f"💥 Erreur traitement webhook: {e}", exc_info=True)
|
||
return False, str(e)
|
||
|
||
async def _sync_signers(
|
||
self,
|
||
session: AsyncSession,
|
||
transaction: UniversignTransaction,
|
||
universign_data: Dict,
|
||
):
|
||
signers_data = universign_data.get("participants", [])
|
||
if not signers_data:
|
||
signers_data = universign_data.get("signers", [])
|
||
|
||
if not signers_data:
|
||
logger.debug("Aucun signataire dans les données Universign")
|
||
return
|
||
|
||
existing_signers = {s.email: s for s in transaction.signers}
|
||
|
||
for idx, signer_data in enumerate(signers_data):
|
||
email = signer_data.get("email", "")
|
||
if not email:
|
||
logger.warning(f"Signataire sans email à l'index {idx}, ignoré")
|
||
continue
|
||
|
||
raw_status = signer_data.get("status") or signer_data.get(
|
||
"state", "waiting"
|
||
)
|
||
try:
|
||
status = UniversignSignerStatus(raw_status)
|
||
except ValueError:
|
||
logger.warning(
|
||
f"Statut inconnu pour signer {email}: {raw_status}, utilisation de 'unknown'"
|
||
)
|
||
status = UniversignSignerStatus.UNKNOWN
|
||
|
||
if email in existing_signers:
|
||
signer = existing_signers[email]
|
||
signer.status = status
|
||
|
||
viewed_at = self._parse_date(signer_data.get("viewed_at"))
|
||
if viewed_at and not signer.viewed_at:
|
||
signer.viewed_at = viewed_at
|
||
|
||
signed_at = self._parse_date(signer_data.get("signed_at"))
|
||
if signed_at and not signer.signed_at:
|
||
signer.signed_at = signed_at
|
||
|
||
refused_at = self._parse_date(signer_data.get("refused_at"))
|
||
if refused_at and not signer.refused_at:
|
||
signer.refused_at = refused_at
|
||
|
||
if signer_data.get("name") and not signer.name:
|
||
signer.name = signer_data.get("name")
|
||
else:
|
||
try:
|
||
signer = UniversignSigner(
|
||
id=f"{transaction.id}_signer_{idx}_{int(datetime.now().timestamp())}",
|
||
transaction_id=transaction.id,
|
||
email=email,
|
||
name=signer_data.get("name"),
|
||
status=status,
|
||
order_index=idx,
|
||
viewed_at=self._parse_date(signer_data.get("viewed_at")),
|
||
signed_at=self._parse_date(signer_data.get("signed_at")),
|
||
refused_at=self._parse_date(signer_data.get("refused_at")),
|
||
)
|
||
session.add(signer)
|
||
logger.info(
|
||
f"➕ Nouveau signataire ajouté: {email} (statut: {status.value})"
|
||
)
|
||
except Exception as e:
|
||
logger.error(f"Erreur création signer {email}: {e}")
|
||
|
||
async def sync_transaction(
|
||
self,
|
||
session,
|
||
transaction,
|
||
force: bool = False,
|
||
):
|
||
import json
|
||
|
||
if is_final_status(transaction.local_status.value) and not force:
|
||
logger.debug(
|
||
f"⏭️ Skip {transaction.transaction_id}: statut final "
|
||
f"{transaction.local_status.value}"
|
||
)
|
||
transaction.needs_sync = False
|
||
await session.commit()
|
||
return True, None
|
||
|
||
logger.info(f"Synchronisation: {transaction.transaction_id}")
|
||
|
||
result = self.fetch_transaction_status(transaction.transaction_id)
|
||
|
||
if not result:
|
||
error = "Échec récupération données Universign"
|
||
logger.error(f" {error}: {transaction.transaction_id}")
|
||
transaction.sync_attempts += 1
|
||
transaction.sync_error = error
|
||
await self._log_sync_attempt(session, transaction, "polling", False, error)
|
||
await session.commit()
|
||
return False, error
|
||
|
||
try:
|
||
universign_data = result["transaction"]
|
||
universign_status_raw = universign_data.get("state", "draft")
|
||
|
||
logger.info(f" Statut Universign brut: {universign_status_raw}")
|
||
|
||
new_local_status = map_universign_to_local(universign_status_raw)
|
||
previous_local_status = transaction.local_status.value
|
||
|
||
logger.info(
|
||
f"Mapping: {universign_status_raw} (Universign) → "
|
||
f"{new_local_status} (Local) | Actuel: {previous_local_status}"
|
||
)
|
||
|
||
if not is_transition_allowed(previous_local_status, new_local_status):
|
||
logger.warning(
|
||
f"Transition refusée: {previous_local_status} → {new_local_status}"
|
||
)
|
||
new_local_status = resolve_status_conflict(
|
||
previous_local_status, new_local_status
|
||
)
|
||
logger.info(f"Résolution conflit: statut résolu = {new_local_status}")
|
||
|
||
status_changed = previous_local_status != new_local_status
|
||
|
||
if status_changed:
|
||
logger.info(
|
||
f"CHANGEMENT DÉTECTÉ: {previous_local_status} → {new_local_status}"
|
||
)
|
||
|
||
try:
|
||
transaction.universign_status = UniversignTransactionStatus(
|
||
universign_status_raw
|
||
)
|
||
except ValueError:
|
||
logger.warning(f"Statut Universign inconnu: {universign_status_raw}")
|
||
if new_local_status == "SIGNE":
|
||
transaction.universign_status = (
|
||
UniversignTransactionStatus.COMPLETED
|
||
)
|
||
elif new_local_status == "REFUSE":
|
||
transaction.universign_status = UniversignTransactionStatus.REFUSED
|
||
elif new_local_status == "EXPIRE":
|
||
transaction.universign_status = UniversignTransactionStatus.EXPIRED
|
||
else:
|
||
transaction.universign_status = UniversignTransactionStatus.STARTED
|
||
|
||
transaction.local_status = LocalDocumentStatus(new_local_status)
|
||
transaction.universign_status_updated_at = datetime.now()
|
||
|
||
if new_local_status == "EN_COURS" and not transaction.sent_at:
|
||
transaction.sent_at = datetime.now()
|
||
logger.info("Date d'envoi mise à jour")
|
||
|
||
if new_local_status == "SIGNE" and not transaction.signed_at:
|
||
transaction.signed_at = datetime.now()
|
||
logger.info("Date de signature mise à jour")
|
||
|
||
if new_local_status == "REFUSE" and not transaction.refused_at:
|
||
transaction.refused_at = datetime.now()
|
||
logger.info(" Date de refus mise à jour")
|
||
|
||
if new_local_status == "EXPIRE" and not transaction.expired_at:
|
||
transaction.expired_at = datetime.now()
|
||
logger.info("Date d'expiration mise à jour")
|
||
|
||
documents = universign_data.get("documents", [])
|
||
if documents:
|
||
first_doc = documents[0]
|
||
logger.info(
|
||
f"Document Universign trouvé: id={first_doc.get('id')}, "
|
||
f"status={first_doc.get('status')}"
|
||
)
|
||
|
||
if new_local_status == "SIGNE" and not transaction.signed_document_path:
|
||
logger.info("Déclenchement téléchargement document signé...")
|
||
|
||
try:
|
||
(
|
||
download_success,
|
||
download_error,
|
||
) = await self.document_service.download_and_store_signed_document(
|
||
session=session, transaction=transaction, force=False
|
||
)
|
||
|
||
if download_success:
|
||
logger.info("Document signé téléchargé et stocké")
|
||
else:
|
||
logger.warning(f"Échec téléchargement: {download_error}")
|
||
|
||
except Exception as e:
|
||
logger.error(f" Erreur téléchargement document: {e}", exc_info=True)
|
||
|
||
await self._sync_signers(session, transaction, universign_data)
|
||
|
||
transaction.last_synced_at = datetime.now()
|
||
transaction.sync_attempts += 1
|
||
transaction.needs_sync = not is_final_status(new_local_status)
|
||
transaction.sync_error = None
|
||
|
||
await self._log_sync_attempt(
|
||
session=session,
|
||
transaction=transaction,
|
||
sync_type="polling",
|
||
success=True,
|
||
error_message=None,
|
||
previous_status=previous_local_status,
|
||
new_status=new_local_status,
|
||
changes=json.dumps(
|
||
{
|
||
"status_changed": status_changed,
|
||
"universign_raw": universign_status_raw,
|
||
"documents_count": len(documents),
|
||
"response_time_ms": result.get("response_time_ms"),
|
||
},
|
||
default=str,
|
||
),
|
||
)
|
||
|
||
await session.commit()
|
||
|
||
if status_changed:
|
||
logger.info(f"🎬 Exécution actions pour statut: {new_local_status}")
|
||
await self._execute_status_actions(
|
||
session, transaction, new_local_status
|
||
)
|
||
|
||
logger.info(
|
||
f"Sync terminée: {transaction.transaction_id} | "
|
||
f"{previous_local_status} → {new_local_status}"
|
||
)
|
||
|
||
return True, None
|
||
|
||
except Exception as e:
|
||
error_msg = f"Erreur lors de la synchronisation: {str(e)}"
|
||
logger.error(f" {error_msg}", exc_info=True)
|
||
|
||
transaction.sync_error = error_msg[:1000]
|
||
transaction.sync_attempts += 1
|
||
|
||
await self._log_sync_attempt(
|
||
session, transaction, "polling", False, error_msg
|
||
)
|
||
await session.commit()
|
||
|
||
return False, error_msg
|
||
|
||
async def _sync_transaction_documents_corrected(
|
||
self, session, transaction, universign_data: dict, new_local_status: str
|
||
):
|
||
documents = universign_data.get("documents", [])
|
||
|
||
if documents:
|
||
first_doc = documents[0]
|
||
first_doc_id = first_doc.get("id")
|
||
|
||
if first_doc_id:
|
||
if hasattr(transaction, "universign_document_id"):
|
||
transaction.universign_document_id = first_doc_id
|
||
|
||
logger.info(
|
||
f"Document Universign: id={first_doc_id}, "
|
||
f"name={first_doc.get('name')}, status={first_doc.get('status')}"
|
||
)
|
||
else:
|
||
logger.debug("Aucun document dans la réponse Universign")
|
||
|
||
if new_local_status == "SIGNE":
|
||
if not transaction.signed_document_path:
|
||
logger.info("Déclenchement téléchargement document signé...")
|
||
|
||
try:
|
||
(
|
||
download_success,
|
||
download_error,
|
||
) = await self.document_service.download_and_store_signed_document(
|
||
session=session, transaction=transaction, force=False
|
||
)
|
||
|
||
if download_success:
|
||
logger.info("Document signé téléchargé avec succès")
|
||
else:
|
||
logger.warning(f"Échec téléchargement: {download_error}")
|
||
|
||
except Exception as e:
|
||
logger.error(f" Erreur téléchargement document: {e}", exc_info=True)
|
||
else:
|
||
logger.debug(
|
||
f"Document déjà téléchargé: {transaction.signed_document_path}"
|
||
)
|
||
|
||
async def _log_sync_attempt(
|
||
self,
|
||
session: AsyncSession,
|
||
transaction: UniversignTransaction,
|
||
sync_type: str,
|
||
success: bool,
|
||
error_message: Optional[str] = None,
|
||
previous_status: Optional[str] = None,
|
||
new_status: Optional[str] = None,
|
||
changes: Optional[str] = None,
|
||
):
|
||
log = UniversignSyncLog(
|
||
transaction_id=transaction.id,
|
||
sync_type=sync_type,
|
||
sync_timestamp=datetime.now(),
|
||
previous_status=previous_status,
|
||
new_status=new_status,
|
||
changes_detected=changes,
|
||
success=success,
|
||
error_message=error_message,
|
||
)
|
||
session.add(log)
|
||
|
||
async def _execute_status_actions(
|
||
self, session: AsyncSession, transaction: UniversignTransaction, new_status: str
|
||
):
|
||
actions = get_status_actions(new_status)
|
||
if not actions:
|
||
return
|
||
|
||
if actions.get("update_sage_status") and self.sage_client:
|
||
await self._update_sage_status(transaction, new_status)
|
||
elif actions.get("update_sage_status"):
|
||
logger.debug(
|
||
f"sage_client non configuré, skip MAJ Sage pour {transaction.sage_document_id}"
|
||
)
|
||
|
||
if actions.get("send_notification") and self.email_queue and self.settings:
|
||
await self._send_notification(session, transaction, new_status)
|
||
elif actions.get("send_notification"):
|
||
logger.debug(
|
||
f"email_queue/settings non configuré, skip notification pour {transaction.transaction_id}"
|
||
)
|
||
|
||
async def _update_sage_status(
|
||
self, transaction: UniversignTransaction, status: str
|
||
):
|
||
if not self.sage_client:
|
||
logger.warning("sage_client non configuré pour mise à jour Sage")
|
||
return
|
||
|
||
try:
|
||
type_doc = transaction.sage_document_type.value
|
||
doc_id = transaction.sage_document_id
|
||
|
||
if status == "SIGNE":
|
||
self.sage_client.changer_statut_document(
|
||
document_type_code=type_doc, numero=doc_id, nouveau_statut=2
|
||
)
|
||
logger.info(f"Statut Sage mis à jour: {doc_id} → Accepté (2)")
|
||
|
||
elif status == "EN_COURS":
|
||
self.sage_client.changer_statut_document(
|
||
document_type_code=type_doc, numero=doc_id, nouveau_statut=1
|
||
)
|
||
logger.info(f"Statut Sage mis à jour: {doc_id} → Confirmé (1)")
|
||
|
||
except Exception as e:
|
||
logger.error(
|
||
f"Erreur mise à jour Sage pour {transaction.sage_document_id}: {e}"
|
||
)
|
||
|
||
async def _send_notification(
|
||
self, session: AsyncSession, transaction: UniversignTransaction, status: str
|
||
):
|
||
if not self.email_queue or not self.settings:
|
||
logger.warning("email_queue ou settings non configuré")
|
||
return
|
||
|
||
try:
|
||
if status == "SIGNE":
|
||
template = templates_signature_email["signature_confirmee"]
|
||
|
||
type_labels = {
|
||
0: "Devis",
|
||
10: "Commande",
|
||
30: "Bon de Livraison",
|
||
60: "Facture",
|
||
50: "Avoir",
|
||
}
|
||
|
||
variables = {
|
||
"NOM_SIGNATAIRE": transaction.requester_name or "Client",
|
||
"TYPE_DOC": type_labels.get(
|
||
transaction.sage_document_type.value, "Document"
|
||
),
|
||
"NUMERO": transaction.sage_document_id,
|
||
"DATE_SIGNATURE": transaction.signed_at.strftime("%d/%m/%Y à %H:%M")
|
||
if transaction.signed_at
|
||
else datetime.now().strftime("%d/%m/%Y à %H:%M"),
|
||
"TRANSACTION_ID": transaction.transaction_id,
|
||
"CONTACT_EMAIL": self.settings.smtp_from,
|
||
}
|
||
|
||
sujet = template["sujet"]
|
||
corps = template["corps_html"]
|
||
|
||
for var, valeur in variables.items():
|
||
sujet = sujet.replace(f"{{{{{var}}}}}", str(valeur))
|
||
corps = corps.replace(f"{{{{{var}}}}}", str(valeur))
|
||
|
||
email_log = EmailLog(
|
||
id=str(uuid.uuid4()),
|
||
destinataire=transaction.requester_email,
|
||
sujet=sujet,
|
||
corps_html=corps,
|
||
document_ids=transaction.sage_document_id,
|
||
type_document=transaction.sage_document_type.value,
|
||
statut=StatutEmail.EN_ATTENTE,
|
||
date_creation=datetime.now(),
|
||
nb_tentatives=0,
|
||
)
|
||
|
||
session.add(email_log)
|
||
await session.flush()
|
||
|
||
self.email_queue.enqueue(email_log.id)
|
||
|
||
logger.info(
|
||
f"Email confirmation signature envoyé à {transaction.requester_email}"
|
||
)
|
||
|
||
except Exception as e:
|
||
logger.error(
|
||
f"Erreur envoi notification pour {transaction.transaction_id}: {e}"
|
||
)
|
||
|
||
@staticmethod
|
||
def _parse_date(date_str: Optional[str]) -> Optional[datetime]:
|
||
if not date_str:
|
||
return None
|
||
try:
|
||
return datetime.fromisoformat(date_str.replace("Z", "+00:00"))
|
||
except Exception:
|
||
return None
|
||
|
||
|
||
class UniversignSyncScheduler:
|
||
def __init__(self, sync_service: UniversignSyncService, interval_minutes: int = 5):
|
||
self.sync_service = sync_service
|
||
self.interval_minutes = interval_minutes
|
||
self.is_running = False
|
||
|
||
async def start(self, session_factory):
|
||
import asyncio
|
||
|
||
self.is_running = True
|
||
|
||
logger.info(
|
||
f"Démarrage polling Universign (intervalle: {self.interval_minutes}min)"
|
||
)
|
||
|
||
while self.is_running:
|
||
try:
|
||
async with session_factory() as session:
|
||
stats = await self.sync_service.sync_all_pending(session)
|
||
|
||
logger.info(
|
||
f"Polling: {stats['success']} transactions synchronisées, "
|
||
f"{stats['status_changes']} changements"
|
||
)
|
||
|
||
except Exception as e:
|
||
logger.error(f"Erreur polling: {e}", exc_info=True)
|
||
|
||
await asyncio.sleep(self.interval_minutes * 60)
|
||
|
||
def stop(self):
|
||
self.is_running = False
|
||
logger.info("Arrêt polling Universign")
|