feat: Add API endpoints for Universign e-signature management, batch email sending, and quote contact retrieval.

This commit is contained in:
Fanilo-Nantenaina 2025-11-27 17:16:51 +03:00
parent df6e09af07
commit cba39ad7ec

663
api.py
View file

@ -644,6 +644,293 @@ async def envoyer_signature(
raise HTTPException(500, str(e))
@app.get("/signature/universign/status", tags=["US-A3"])
async def statut_signature(docId: str = Query(...)):
"""🔍 Récupération du statut de signature en temps réel"""
# Chercher dans la DB locale
try:
async with async_session_factory() as session:
query = select(SignatureLog).where(SignatureLog.document_id == docId)
result = await session.execute(query)
signature_log = result.scalar_one_or_none()
if not signature_log:
raise HTTPException(404, "Signature introuvable")
# Interroger Universign
statut = await universign_statut(signature_log.transaction_id)
return {
"doc_id": docId,
"statut": statut["statut"],
"date_signature": statut.get("date_signature"),
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Erreur statut signature: {e}")
raise HTTPException(500, str(e))
@app.get("/signatures", tags=["US-A3"])
async def lister_signatures(
statut: Optional[StatutSignature] = Query(None),
limit: int = Query(100, le=1000),
session: AsyncSession = Depends(get_session),
):
"""📋 Liste toutes les demandes de signature"""
query = select(SignatureLog).order_by(SignatureLog.date_envoi.desc())
if statut:
statut_db = StatutSignatureEnum[statut.value]
query = query.where(SignatureLog.statut == statut_db)
query = query.limit(limit)
result = await session.execute(query)
signatures = result.scalars().all()
return [
{
"id": sig.id,
"document_id": sig.document_id,
"type_document": sig.type_document.value,
"transaction_id": sig.transaction_id,
"signer_url": sig.signer_url,
"email_signataire": sig.email_signataire,
"nom_signataire": sig.nom_signataire,
"statut": sig.statut.value,
"date_envoi": sig.date_envoi.isoformat() if sig.date_envoi else None,
"date_signature": (
sig.date_signature.isoformat() if sig.date_signature else None
),
"est_relance": sig.est_relance,
"nb_relances": sig.nb_relances or 0,
}
for sig in signatures
]
@app.get("/signatures/{transaction_id}/status", tags=["US-A3"])
async def statut_signature_detail(
transaction_id: str, session: AsyncSession = Depends(get_session)
):
"""🔍 Récupération du statut détaillé d'une signature"""
query = select(SignatureLog).where(SignatureLog.transaction_id == transaction_id)
result = await session.execute(query)
signature_log = result.scalar_one_or_none()
if not signature_log:
raise HTTPException(404, f"Transaction {transaction_id} introuvable")
# Interroger Universign
statut_universign = await universign_statut(transaction_id)
if statut_universign.get("statut") != "ERREUR":
statut_map = {
"EN_ATTENTE": StatutSignatureEnum.EN_ATTENTE,
"ENVOYE": StatutSignatureEnum.ENVOYE,
"SIGNE": StatutSignatureEnum.SIGNE,
"REFUSE": StatutSignatureEnum.REFUSE,
"EXPIRE": StatutSignatureEnum.EXPIRE,
}
nouveau_statut = statut_map.get(
statut_universign["statut"], StatutSignatureEnum.EN_ATTENTE
)
signature_log.statut = nouveau_statut
if statut_universign.get("date_signature"):
signature_log.date_signature = datetime.fromisoformat(
statut_universign["date_signature"].replace("Z", "+00:00")
)
await session.commit()
return {
"transaction_id": transaction_id,
"document_id": signature_log.document_id,
"statut": signature_log.statut.value,
"email_signataire": signature_log.email_signataire,
"date_envoi": (
signature_log.date_envoi.isoformat() if signature_log.date_envoi else None
),
"date_signature": (
signature_log.date_signature.isoformat()
if signature_log.date_signature
else None
),
"signer_url": signature_log.signer_url,
}
@app.post("/signatures/refresh-all", tags=["US-A3"])
async def rafraichir_statuts_signatures(session: AsyncSession = Depends(get_session)):
"""🔄 Rafraîchit TOUS les statuts des signatures en attente"""
query = select(SignatureLog).where(
SignatureLog.statut.in_(
[StatutSignatureEnum.EN_ATTENTE, StatutSignatureEnum.ENVOYE]
)
)
result = await session.execute(query)
signatures = result.scalars().all()
nb_mises_a_jour = 0
for sig in signatures:
try:
statut_universign = await universign_statut(sig.transaction_id)
if statut_universign.get("statut") != "ERREUR":
statut_map = {
"SIGNE": StatutSignatureEnum.SIGNE,
"REFUSE": StatutSignatureEnum.REFUSE,
"EXPIRE": StatutSignatureEnum.EXPIRE,
}
nouveau = statut_map.get(statut_universign["statut"])
if nouveau and nouveau != sig.statut:
sig.statut = nouveau
if statut_universign.get("date_signature"):
sig.date_signature = datetime.fromisoformat(
statut_universign["date_signature"].replace("Z", "+00:00")
)
nb_mises_a_jour += 1
except Exception as e:
logger.error(f"Erreur refresh signature {sig.transaction_id}: {e}")
continue
await session.commit()
return {
"success": True,
"nb_signatures_verifiees": len(signatures),
"nb_mises_a_jour": nb_mises_a_jour,
}
@app.post("/devis/{id}/signer", tags=["US-A3"])
async def envoyer_devis_signature(
id: str, request: SignatureRequest, session: AsyncSession = Depends(get_session)
):
"""✏️ Envoi d'un devis pour signature électronique"""
try:
# Vérifier devis via gateway Windows
devis = sage_client.lire_devis(id)
if not devis:
raise HTTPException(404, f"Devis {id} introuvable")
# Générer PDF
pdf_bytes = email_queue._generate_pdf(id, TypeDocument.DEVIS)
# Envoi Universign
resultat = await universign_envoyer(
id, pdf_bytes, request.email_signataire, request.nom_signataire
)
if "error" in resultat:
raise HTTPException(500, f"Erreur Universign: {resultat['error']}")
# Logger en DB
signature_log = SignatureLog(
id=str(uuid.uuid4()),
document_id=id,
type_document=TypeDocument.DEVIS,
transaction_id=resultat["transaction_id"],
signer_url=resultat["signer_url"],
email_signataire=request.email_signataire,
nom_signataire=request.nom_signataire,
statut=StatutSignatureEnum.ENVOYE,
date_envoi=datetime.now(),
)
session.add(signature_log)
await session.commit()
# MAJ champ libre Sage via gateway
sage_client.mettre_a_jour_champ_libre(
id, TypeDocument.DEVIS, "UniversignID", resultat["transaction_id"]
)
return {
"success": True,
"devis_id": id,
"transaction_id": resultat["transaction_id"],
"signer_url": resultat["signer_url"],
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Erreur envoi signature: {e}")
raise HTTPException(500, str(e))
# ============================================
# US-A4 - ENVOI EMAILS EN LOT
# ============================================
class EmailBatchRequest(BaseModel):
destinataires: List[EmailStr] = Field(..., min_length=1, max_length=100)
sujet: str = Field(..., min_length=1, max_length=500)
corps_html: str = Field(..., min_length=1)
document_ids: Optional[List[str]] = None
type_document: Optional[TypeDocument] = None
@app.post("/emails/send-batch", tags=["US-A4"])
async def envoyer_emails_lot(
batch: EmailBatchRequest, session: AsyncSession = Depends(get_session)
):
"""📧 US-A4: Envoi groupé via email_queue"""
resultats = []
for destinataire in batch.destinataires:
email_log = EmailLog(
id=str(uuid.uuid4()),
destinataire=destinataire,
sujet=batch.sujet,
corps_html=batch.corps_html,
document_ids=",".join(batch.document_ids) if batch.document_ids else None,
type_document=batch.type_document,
statut=StatutEmailEnum.EN_ATTENTE,
date_creation=datetime.now(),
nb_tentatives=0,
)
session.add(email_log)
await session.flush()
email_queue.enqueue(email_log.id)
resultats.append(
{
"destinataire": destinataire,
"log_id": email_log.id,
"statut": "EN_ATTENTE",
}
)
await session.commit()
nb_documents = len(batch.document_ids) if batch.document_ids else 0
logger.info(
f"{len(batch.destinataires)} emails mis en file avec {nb_documents} docs"
)
return {
"total": len(batch.destinataires),
"succes": len(batch.destinataires),
"documents_attaches": nb_documents,
"details": resultats,
}
# =====================================================
# ENDPOINTS - US-A5
# =====================================================
@ -749,6 +1036,41 @@ async def relancer_devis_signature(
raise HTTPException(500, str(e))
class ContactClientResponse(BaseModel):
client_code: str
client_intitule: str
email: Optional[str]
nom: Optional[str]
telephone: Optional[str]
peut_etre_relance: bool
@app.get("/devis/{id}/contact", response_model=ContactClientResponse, tags=["US-A6"])
async def recuperer_contact_devis(id: str):
"""👤 US-A6: Récupération du contact client associé au devis"""
try:
# Lire devis via gateway Windows
devis = sage_client.lire_devis(id)
if not devis:
raise HTTPException(404, f"Devis {id} introuvable")
# Lire contact via gateway Windows
contact = sage_client.lire_contact_client(devis["client_code"])
if not contact:
raise HTTPException(
404, f"Contact introuvable pour client {devis['client_code']}"
)
peut_relancer = bool(contact.get("email"))
return ContactClientResponse(**contact, peut_etre_relance=peut_relancer)
except HTTPException:
raise
except Exception as e:
logger.error(f"Erreur récupération contact: {e}")
raise HTTPException(500, str(e))
# =====================================================
# ENDPOINTS - US-A7
# =====================================================
@ -771,6 +1093,347 @@ async def lister_factures(
raise HTTPException(500, str(e))
class RelanceFactureRequest(BaseModel):
doc_id: str
message_personnalise: Optional[str] = None
# Templates email (si pas déjà définis)
templates_email_db = {
"relance_facture": {
"id": "relance_facture",
"nom": "Relance Facture",
"sujet": "Rappel - Facture {{DO_Piece}}",
"corps_html": """
<p>Bonjour {{CT_Intitule}},</p>
<p>La facture <strong>{{DO_Piece}}</strong> du {{DO_Date}}
d'un montant de <strong>{{DO_TotalTTC}}€ TTC</strong> reste impayée.</p>
<p>Merci de régulariser dans les meilleurs délais.</p>
<p>Cordialement,</p>
""",
"variables_disponibles": [
"DO_Piece",
"DO_Date",
"CT_Intitule",
"DO_TotalHT",
"DO_TotalTTC",
],
}
}
@app.post("/factures/{id}/relancer", tags=["US-A7"])
async def relancer_facture(
id: str,
relance: RelanceFactureRequest,
session: AsyncSession = Depends(get_session),
):
"""💸 US-A7: Relance facture en un clic"""
try:
# Lire facture via gateway Windows
facture = sage_client.lire_document(id, TypeDocument.FACTURE)
if not facture:
raise HTTPException(404, f"Facture {id} introuvable")
# Récupérer contact via gateway Windows
contact = sage_client.lire_contact_client(facture["client_code"])
if not contact or not contact.get("email"):
raise HTTPException(400, "Aucun email trouvé pour ce client")
# Préparer email
template = templates_email_db["relance_facture"]
variables = {
"DO_Piece": facture.get("numero", id),
"DO_Date": str(facture.get("date", "")),
"CT_Intitule": facture.get("client_intitule", ""),
"DO_TotalHT": f"{facture.get('total_ht', 0):.2f}",
"DO_TotalTTC": f"{facture.get('total_ttc', 0):.2f}",
}
sujet = template["sujet"]
corps = relance.message_personnalise or template["corps_html"]
for var, valeur in variables.items():
sujet = sujet.replace(f"{{{{{var}}}}}", valeur)
corps = corps.replace(f"{{{{{var}}}}}", valeur)
# Créer log email
email_log = EmailLog(
id=str(uuid.uuid4()),
destinataire=contact["email"],
sujet=sujet,
corps_html=corps,
document_ids=id,
type_document=TypeDocument.FACTURE,
statut=StatutEmailEnum.EN_ATTENTE,
date_creation=datetime.now(),
nb_tentatives=0,
)
session.add(email_log)
await session.flush()
# Enqueue
email_queue.enqueue(email_log.id)
# ✅ MAJ champ libre via gateway Windows
sage_client.mettre_a_jour_derniere_relance(id, TypeDocument.FACTURE)
await session.commit()
logger.info(f"✅ Relance facture: {id}{contact['email']}")
return {
"success": True,
"facture_id": id,
"email_log_id": email_log.id,
"destinataire": contact["email"],
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Erreur relance facture: {e}")
raise HTTPException(500, str(e))
# ============================================
# US-A9 - JOURNAL DES E-MAILS
# ============================================
@app.get("/emails/logs", tags=["US-A9"])
async def journal_emails(
statut: Optional[StatutEmail] = Query(None),
destinataire: Optional[str] = Query(None),
limit: int = Query(100, le=1000),
session: AsyncSession = Depends(get_session),
):
"""📋 US-A9: Journal des e-mails envoyés"""
query = select(EmailLog)
if statut:
query = query.where(EmailLog.statut == StatutEmailEnum[statut.value])
if destinataire:
query = query.where(EmailLog.destinataire.contains(destinataire))
query = query.order_by(EmailLog.date_creation.desc()).limit(limit)
result = await session.execute(query)
logs = result.scalars().all()
return [
{
"id": log.id,
"destinataire": log.destinataire,
"sujet": log.sujet,
"statut": log.statut.value,
"date_creation": log.date_creation.isoformat(),
"date_envoi": log.date_envoi.isoformat() if log.date_envoi else None,
"nb_tentatives": log.nb_tentatives,
"derniere_erreur": log.derniere_erreur,
"document_ids": log.document_ids,
}
for log in logs
]
@app.get("/emails/logs/export", tags=["US-A9"])
async def exporter_logs_csv(
statut: Optional[StatutEmail] = Query(None),
session: AsyncSession = Depends(get_session),
):
"""📥 US-A9: Export CSV des logs d'envoi"""
query = select(EmailLog)
if statut:
query = query.where(EmailLog.statut == StatutEmailEnum[statut.value])
query = query.order_by(EmailLog.date_creation.desc())
result = await session.execute(query)
logs = result.scalars().all()
# Génération CSV
output = io.StringIO()
writer = csv.writer(output)
# En-têtes
writer.writerow(
[
"ID",
"Destinataire",
"Sujet",
"Statut",
"Date Création",
"Date Envoi",
"Nb Tentatives",
"Erreur",
"Documents",
]
)
# Données
for log in logs:
writer.writerow(
[
log.id,
log.destinataire,
log.sujet,
log.statut.value,
log.date_creation.isoformat(),
log.date_envoi.isoformat() if log.date_envoi else "",
log.nb_tentatives,
log.derniere_erreur or "",
log.document_ids or "",
]
)
output.seek(0)
return StreamingResponse(
iter([output.getvalue()]),
media_type="text/csv",
headers={
"Content-Disposition": f"attachment; filename=emails_logs_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
},
)
# ============================================
# US-A10 - MODÈLES D'E-MAILS
# ============================================
class TemplateEmail(BaseModel):
id: Optional[str] = None
nom: str
sujet: str
corps_html: str
variables_disponibles: List[str] = []
class TemplatePreviewRequest(BaseModel):
template_id: str
document_id: str
type_document: TypeDocument
@app.get("/templates/emails", response_model=List[TemplateEmail], tags=["US-A10"])
async def lister_templates():
"""📧 US-A10: Liste tous les templates d'emails"""
return [TemplateEmail(**template) for template in templates_email_db.values()]
@app.get(
"/templates/emails/{template_id}", response_model=TemplateEmail, tags=["US-A10"]
)
async def lire_template(template_id: str):
"""📖 Lecture d'un template par ID"""
if template_id not in templates_email_db:
raise HTTPException(404, f"Template {template_id} introuvable")
return TemplateEmail(**templates_email_db[template_id])
@app.post("/templates/emails", response_model=TemplateEmail, tags=["US-A10"])
async def creer_template(template: TemplateEmail):
""" Création d'un nouveau template"""
template_id = str(uuid.uuid4())
templates_email_db[template_id] = {
"id": template_id,
"nom": template.nom,
"sujet": template.sujet,
"corps_html": template.corps_html,
"variables_disponibles": template.variables_disponibles,
}
logger.info(f"Template créé: {template_id}")
return TemplateEmail(id=template_id, **template.dict())
@app.put(
"/templates/emails/{template_id}", response_model=TemplateEmail, tags=["US-A10"]
)
async def modifier_template(template_id: str, template: TemplateEmail):
"""✏️ Modification d'un template existant"""
if template_id not in templates_email_db:
raise HTTPException(404, f"Template {template_id} introuvable")
# Ne pas modifier les templates système
if template_id in ["relance_devis", "relance_facture"]:
raise HTTPException(400, "Les templates système ne peuvent pas être modifiés")
templates_email_db[template_id] = {
"id": template_id,
"nom": template.nom,
"sujet": template.sujet,
"corps_html": template.corps_html,
"variables_disponibles": template.variables_disponibles,
}
logger.info(f"Template modifié: {template_id}")
return TemplateEmail(id=template_id, **template.dict())
@app.delete("/templates/emails/{template_id}", tags=["US-A10"])
async def supprimer_template(template_id: str):
"""🗑️ Suppression d'un template"""
if template_id not in templates_email_db:
raise HTTPException(404, f"Template {template_id} introuvable")
if template_id in ["relance_devis", "relance_facture"]:
raise HTTPException(400, "Les templates système ne peuvent pas être supprimés")
del templates_email_db[template_id]
logger.info(f"Template supprimé: {template_id}")
return {"success": True, "message": f"Template {template_id} supprimé"}
@app.post("/templates/emails/preview", tags=["US-A10"])
async def previsualiser_email(preview: TemplatePreviewRequest):
"""👁️ US-A10: Prévisualisation email avec fusion variables"""
if preview.template_id not in templates_email_db:
raise HTTPException(404, f"Template {preview.template_id} introuvable")
template = templates_email_db[preview.template_id]
# Lire document via gateway Windows
doc = sage_client.lire_document(preview.document_id, preview.type_document)
if not doc:
raise HTTPException(404, f"Document {preview.document_id} introuvable")
# Variables
variables = {
"DO_Piece": doc.get("numero", preview.document_id),
"DO_Date": str(doc.get("date", "")),
"CT_Intitule": doc.get("client_intitule", ""),
"DO_TotalHT": f"{doc.get('total_ht', 0):.2f}",
"DO_TotalTTC": f"{doc.get('total_ttc', 0):.2f}",
}
# Fusion
sujet_preview = template["sujet"]
corps_preview = template["corps_html"]
for var, valeur in variables.items():
sujet_preview = sujet_preview.replace(f"{{{{{var}}}}}", valeur)
corps_preview = corps_preview.replace(f"{{{{{var}}}}}", valeur)
return {
"template_id": preview.template_id,
"document_id": preview.document_id,
"sujet": sujet_preview,
"corps_html": corps_preview,
"variables_utilisees": variables,
}
# =====================================================
# ENDPOINTS - HEALTH
# =====================================================