gateway/modules/shared/notifyMandateAdmins.py
2026-04-26 08:31:35 +02:00

283 lines
10 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Central notification utility for mandate administrators.
All mandate-level notifications (subscription changes, billing warnings, etc.)
MUST go through notifyMandateAdmins() to ensure consistent recipient resolution
and delivery.
Recipients: all users with the mandate-level "admin" RBAC role.
"""
from __future__ import annotations
import html
import json
import logging
from typing import Any, Dict, List, Optional, Set
from modules.datamodels.datamodelMessaging import MessagingChannel
from modules.interfaces.interfaceMessaging import getInterface as getMessagingInterface
logger = logging.getLogger(__name__)
# ============================================================================
# Recipient resolution
# ============================================================================
def _normalizeEmailList(raw: Any) -> List[str]:
"""Parse notifyEmails which can be a list, JSON string, or single address."""
if raw is None:
return []
if isinstance(raw, list):
return [str(e).strip().lower() for e in raw if str(e).strip()]
if isinstance(raw, str):
s = raw.strip()
if not s:
return []
if s.startswith("["):
try:
parsed = json.loads(s)
if isinstance(parsed, list):
return [str(e).strip().lower() for e in parsed if str(e).strip()]
except Exception:
pass
return [s.lower()]
return []
def _resolveMandateContactEmails(mandateId: str) -> List[str]:
"""Get the configured notifyEmails from BillingSettings."""
try:
from modules.interfaces.interfaceDbBilling import getInterface as getBillingInterface
from modules.security.rootAccess import getRootUser
billingIf = getBillingInterface(getRootUser(), mandateId)
settings = billingIf.getSettings(mandateId) or {}
return _normalizeEmailList(settings.get("notifyEmails"))
except Exception as e:
logger.warning("Could not resolve BillingSettings.notifyEmails for mandate %s: %s", mandateId, e)
return []
def _resolveMandateAdminEmails(mandateId: str) -> List[str]:
"""Resolve all admin users of a mandate via RBAC and return their emails."""
emails: List[str] = []
try:
from modules.interfaces.interfaceDbApp import getRootInterface
rootIf = getRootInterface()
userMandates = rootIf.getUserMandatesByMandate(mandateId)
for um in userMandates:
if not getattr(um, "enabled", True):
continue
umId = str(getattr(um, "id", ""))
userId = getattr(um, "userId", None)
if not userId:
continue
roleIds = rootIf.getRoleIdsForUserMandate(umId)
isAdmin = False
for roleId in roleIds:
role = rootIf.getRole(roleId)
if role and role.roleLabel == "admin" and not role.featureInstanceId:
isAdmin = True
break
if not isAdmin:
continue
user = rootIf.getUser(str(userId))
if user and user.email:
emails.append(user.email.strip().lower())
except Exception as e:
logger.warning("Could not resolve admin emails for mandate %s: %s", mandateId, e)
return emails
def _resolveAllRecipients(mandateId: str) -> List[str]:
"""Mandate admin user emails only (RBAC-resolved), deduplicated."""
seen: Set[str] = set()
result: List[str] = []
for email in _resolveMandateAdminEmails(mandateId):
if email and email not in seen:
seen.add(email)
result.append(email)
return result
# ============================================================================
# Mandate name resolution
# ============================================================================
def resolveMandateName(mandateId: str) -> str:
"""Return the human-readable mandate name (label or name), falling back to a short ID."""
try:
from modules.datamodels.datamodelUam import Mandate
from modules.security.rootAccess import getRootDbAppConnector
appDb = getRootDbAppConnector()
rows = appDb.getRecordset(Mandate, recordFilter={"id": mandateId})
if rows:
return rows[0].get("label") or rows[0].get("name") or mandateId[:8]
except Exception as e:
logger.warning("Could not resolve mandate name for %s: %s", mandateId, e)
return mandateId[:8]
# ============================================================================
# HTML email rendering
# ============================================================================
def _getOperatorInfo() -> Dict[str, str]:
"""Load operator company data from config.ini."""
try:
from modules.shared.configuration import APP_CONFIG
return {
"companyName": APP_CONFIG.get("Operator_CompanyName", ""),
"address": APP_CONFIG.get("Operator_Address", ""),
"vatNumber": APP_CONFIG.get("Operator_VatNumber", ""),
}
except Exception:
return {"companyName": "", "address": "", "vatNumber": ""}
def renderHtmlEmail(
headline: str,
bodyParagraphs: List[str],
mandateName: str,
footerNote: Optional[str] = None,
rawHtmlBlock: Optional[str] = None,
) -> str:
"""Render a clean, professional HTML notification email.
Args:
rawHtmlBlock: Optional pre-formatted HTML inserted after bodyParagraphs (e.g. invoice table).
"""
hl = html.escape(headline)
mn = html.escape(mandateName)
paragraphsHtml = ""
for p in bodyParagraphs:
escaped = html.escape(p).replace("\n", "<br>")
paragraphsHtml += f'<p style="margin: 0 0 14px 0; color: #333333;">{escaped}</p>\n'
rawBlock = ""
if rawHtmlBlock:
rawBlock = f'<div style="margin: 16px 0;">{rawHtmlBlock}</div>\n'
footer = ""
if footerNote:
footer = (
f'<p style="margin: 16px 0 0 0; font-size: 13px; color: #888888;">'
f'{html.escape(footerNote)}</p>\n'
)
operator = _getOperatorInfo()
operatorLine = ""
parts = [p for p in [operator["companyName"], operator["address"], operator["vatNumber"]] if p]
if parts:
operatorLine = (
f'<p style="margin: 4px 0 0 0; font-size: 11px; color: #b0b0b0; text-align: center;">'
f'{html.escape(" | ".join(parts))}</p>\n'
)
return f"""<!DOCTYPE html>
<html lang="de">
<head><meta charset="utf-8"><meta name="viewport" content="width=device-width, initial-scale=1.0"></head>
<body style="margin: 0; padding: 0; background-color: #f4f4f7; font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, Helvetica, Arial, sans-serif;">
<table role="presentation" width="100%" cellpadding="0" cellspacing="0" style="background-color: #f4f4f7; padding: 32px 16px;">
<tr><td align="center">
<table role="presentation" width="560" cellpadding="0" cellspacing="0" style="background-color: #ffffff; border-radius: 8px; overflow: hidden; box-shadow: 0 1px 3px rgba(0,0,0,0.08);">
<!-- Header -->
<tr><td style="background-color: #1a1a2e; padding: 24px 32px;">
<h1 style="margin: 0; font-size: 18px; font-weight: 600; color: #ffffff;">PowerOn</h1>
</td></tr>
<!-- Body -->
<tr><td style="padding: 32px;">
<h2 style="margin: 0 0 8px 0; font-size: 20px; font-weight: 600; color: #1a1a2e;">{hl}</h2>
<p style="margin: 0 0 24px 0; font-size: 14px; color: #6b7280;">Mandant: <strong>{mn}</strong></p>
<div style="font-size: 15px; line-height: 1.6;">
{paragraphsHtml}
{rawBlock}
</div>
{footer}
</td></tr>
<!-- Footer -->
<tr><td style="padding: 16px 32px; background-color: #f9fafb; border-top: 1px solid #e5e7eb;">
<p style="margin: 0; font-size: 12px; color: #9ca3af; text-align: center;">
Diese E-Mail wurde automatisch von PowerOn versendet.
</p>
{operatorLine}
</td></tr>
</table>
</td></tr>
</table>
</body>
</html>"""
# ============================================================================
# Public API
# ============================================================================
def notifyMandateAdmins(
mandateId: str,
subject: str,
headline: str,
bodyParagraphs: List[str],
*,
footerNote: Optional[str] = None,
rawHtmlBlock: Optional[str] = None,
) -> int:
"""
Send a styled HTML notification to all mandate admins.
Args:
mandateId: The mandate to notify admins for.
subject: Email subject line.
headline: Bold headline inside the email body.
bodyParagraphs: List of paragraph strings (plain text, auto-escaped).
footerNote: Optional small-print note below the main content.
rawHtmlBlock: Optional pre-formatted HTML block (e.g. invoice summary table).
Returns:
Number of recipients that were successfully notified.
"""
if not mandateId:
return 0
recipients = _resolveAllRecipients(mandateId)
if not recipients:
logger.warning(
"notifyMandateAdmins: no recipients found for mandate %s "
"(no notifyEmails configured and no admin users with email)",
mandateId,
)
return 0
mandateName = resolveMandateName(mandateId)
htmlMessage = renderHtmlEmail(headline, bodyParagraphs, mandateName, footerNote, rawHtmlBlock)
messaging = getMessagingInterface()
successCount = 0
for to in recipients:
try:
ok = messaging.send(
channel=MessagingChannel.EMAIL,
recipient=to,
subject=subject,
message=htmlMessage,
)
if ok:
successCount += 1
else:
logger.warning("notifyMandateAdmins: send failed for %s", to)
except Exception as e:
logger.error("notifyMandateAdmins: error sending to %s: %s", to, e)
logger.info(
"notifyMandateAdmins: sent '%s' to %d/%d recipients for mandate %s (%s)",
subject, successCount, len(recipients), mandateId, mandateName,
)
return successCount