285 lines
10 KiB
Python
285 lines
10 KiB
Python
# Copyright (c) 2025 Patrick Motsch
|
|
# All rights reserved.
|
|
"""
|
|
Central notification utility for mandate administrators.
|
|
|
|
All mandate-level notifications (subscription changes, billing warnings, etc.)
|
|
MUST go through notifyMandateAdmins() to ensure consistent recipient resolution
|
|
and delivery.
|
|
|
|
Recipients are the union of:
|
|
1. BillingSettings.notifyEmails for the mandate (configured contact addresses)
|
|
2. All users with the mandate-level "admin" RBAC role
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import html
|
|
import json
|
|
import logging
|
|
from typing import Any, Dict, List, Optional, Set
|
|
|
|
from modules.datamodels.datamodelMessaging import MessagingChannel
|
|
from modules.interfaces.interfaceMessaging import getInterface as getMessagingInterface
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# ============================================================================
|
|
# Recipient resolution
|
|
# ============================================================================
|
|
|
|
|
|
def _normalizeEmailList(raw: Any) -> List[str]:
|
|
"""Parse notifyEmails which can be a list, JSON string, or single address."""
|
|
if raw is None:
|
|
return []
|
|
if isinstance(raw, list):
|
|
return [str(e).strip().lower() for e in raw if str(e).strip()]
|
|
if isinstance(raw, str):
|
|
s = raw.strip()
|
|
if not s:
|
|
return []
|
|
if s.startswith("["):
|
|
try:
|
|
parsed = json.loads(s)
|
|
if isinstance(parsed, list):
|
|
return [str(e).strip().lower() for e in parsed if str(e).strip()]
|
|
except Exception:
|
|
pass
|
|
return [s.lower()]
|
|
return []
|
|
|
|
|
|
def _resolveMandateContactEmails(mandateId: str) -> List[str]:
|
|
"""Get the configured notifyEmails from BillingSettings."""
|
|
try:
|
|
from modules.interfaces.interfaceDbBilling import getInterface as getBillingInterface
|
|
from modules.security.rootAccess import getRootUser
|
|
billingIf = getBillingInterface(getRootUser(), mandateId)
|
|
settings = billingIf.getSettings(mandateId) or {}
|
|
return _normalizeEmailList(settings.get("notifyEmails"))
|
|
except Exception as e:
|
|
logger.warning("Could not resolve BillingSettings.notifyEmails for mandate %s: %s", mandateId, e)
|
|
return []
|
|
|
|
|
|
def _resolveMandateAdminEmails(mandateId: str) -> List[str]:
|
|
"""Resolve all admin users of a mandate via RBAC and return their emails."""
|
|
emails: List[str] = []
|
|
try:
|
|
from modules.interfaces.interfaceDbApp import getRootInterface
|
|
rootIf = getRootInterface()
|
|
userMandates = rootIf.getUserMandatesByMandate(mandateId)
|
|
for um in userMandates:
|
|
if not getattr(um, "enabled", True):
|
|
continue
|
|
umId = str(getattr(um, "id", ""))
|
|
userId = getattr(um, "userId", None)
|
|
if not userId:
|
|
continue
|
|
roleIds = rootIf.getRoleIdsForUserMandate(umId)
|
|
isAdmin = False
|
|
for roleId in roleIds:
|
|
role = rootIf.getRole(roleId)
|
|
if role and role.roleLabel == "admin" and not role.featureInstanceId:
|
|
isAdmin = True
|
|
break
|
|
if not isAdmin:
|
|
continue
|
|
user = rootIf.getUser(str(userId))
|
|
if user and user.email:
|
|
emails.append(user.email.strip().lower())
|
|
except Exception as e:
|
|
logger.warning("Could not resolve admin emails for mandate %s: %s", mandateId, e)
|
|
return emails
|
|
|
|
|
|
def _resolveAllRecipients(mandateId: str) -> List[str]:
|
|
"""Union of BillingSettings.notifyEmails + all mandate admin user emails, deduplicated."""
|
|
seen: Set[str] = set()
|
|
result: List[str] = []
|
|
for email in _resolveMandateContactEmails(mandateId) + _resolveMandateAdminEmails(mandateId):
|
|
if email and email not in seen:
|
|
seen.add(email)
|
|
result.append(email)
|
|
return result
|
|
|
|
|
|
# ============================================================================
|
|
# Mandate name resolution
|
|
# ============================================================================
|
|
|
|
|
|
def _resolveMandateName(mandateId: str) -> str:
|
|
"""Return the human-readable mandate name (label or name), falling back to a short ID."""
|
|
try:
|
|
from modules.datamodels.datamodelUam import Mandate
|
|
from modules.security.rootAccess import getRootDbAppConnector
|
|
appDb = getRootDbAppConnector()
|
|
rows = appDb.getRecordset(Mandate, recordFilter={"id": mandateId})
|
|
if rows:
|
|
return rows[0].get("label") or rows[0].get("name") or mandateId[:8]
|
|
except Exception as e:
|
|
logger.warning("Could not resolve mandate name for %s: %s", mandateId, e)
|
|
return mandateId[:8]
|
|
|
|
|
|
# ============================================================================
|
|
# HTML email rendering
|
|
# ============================================================================
|
|
|
|
|
|
def _getOperatorInfo() -> Dict[str, str]:
|
|
"""Load operator company data from config.ini."""
|
|
try:
|
|
from modules.shared.configuration import APP_CONFIG
|
|
return {
|
|
"companyName": APP_CONFIG.get("Operator_CompanyName", ""),
|
|
"address": APP_CONFIG.get("Operator_Address", ""),
|
|
"vatNumber": APP_CONFIG.get("Operator_VatNumber", ""),
|
|
}
|
|
except Exception:
|
|
return {"companyName": "", "address": "", "vatNumber": ""}
|
|
|
|
|
|
def _renderHtmlEmail(
|
|
headline: str,
|
|
bodyParagraphs: List[str],
|
|
mandateName: str,
|
|
footerNote: Optional[str] = None,
|
|
rawHtmlBlock: Optional[str] = None,
|
|
) -> str:
|
|
"""Render a clean, professional HTML notification email.
|
|
|
|
Args:
|
|
rawHtmlBlock: Optional pre-formatted HTML inserted after bodyParagraphs (e.g. invoice table).
|
|
"""
|
|
hl = html.escape(headline)
|
|
mn = html.escape(mandateName)
|
|
|
|
paragraphsHtml = ""
|
|
for p in bodyParagraphs:
|
|
escaped = html.escape(p).replace("\n", "<br>")
|
|
paragraphsHtml += f'<p style="margin: 0 0 14px 0; color: #333333;">{escaped}</p>\n'
|
|
|
|
rawBlock = ""
|
|
if rawHtmlBlock:
|
|
rawBlock = f'<div style="margin: 16px 0;">{rawHtmlBlock}</div>\n'
|
|
|
|
footer = ""
|
|
if footerNote:
|
|
footer = (
|
|
f'<p style="margin: 16px 0 0 0; font-size: 13px; color: #888888;">'
|
|
f'{html.escape(footerNote)}</p>\n'
|
|
)
|
|
|
|
operator = _getOperatorInfo()
|
|
operatorLine = ""
|
|
parts = [p for p in [operator["companyName"], operator["address"], operator["vatNumber"]] if p]
|
|
if parts:
|
|
operatorLine = (
|
|
f'<p style="margin: 4px 0 0 0; font-size: 11px; color: #b0b0b0; text-align: center;">'
|
|
f'{html.escape(" | ".join(parts))}</p>\n'
|
|
)
|
|
|
|
return f"""<!DOCTYPE html>
|
|
<html lang="de">
|
|
<head><meta charset="utf-8"><meta name="viewport" content="width=device-width, initial-scale=1.0"></head>
|
|
<body style="margin: 0; padding: 0; background-color: #f4f4f7; font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, Helvetica, Arial, sans-serif;">
|
|
<table role="presentation" width="100%" cellpadding="0" cellspacing="0" style="background-color: #f4f4f7; padding: 32px 16px;">
|
|
<tr><td align="center">
|
|
<table role="presentation" width="560" cellpadding="0" cellspacing="0" style="background-color: #ffffff; border-radius: 8px; overflow: hidden; box-shadow: 0 1px 3px rgba(0,0,0,0.08);">
|
|
<!-- Header -->
|
|
<tr><td style="background-color: #1a1a2e; padding: 24px 32px;">
|
|
<h1 style="margin: 0; font-size: 18px; font-weight: 600; color: #ffffff;">PowerOn</h1>
|
|
</td></tr>
|
|
<!-- Body -->
|
|
<tr><td style="padding: 32px;">
|
|
<h2 style="margin: 0 0 8px 0; font-size: 20px; font-weight: 600; color: #1a1a2e;">{hl}</h2>
|
|
<p style="margin: 0 0 24px 0; font-size: 14px; color: #6b7280;">Mandant: <strong>{mn}</strong></p>
|
|
<div style="font-size: 15px; line-height: 1.6;">
|
|
{paragraphsHtml}
|
|
{rawBlock}
|
|
</div>
|
|
{footer}
|
|
</td></tr>
|
|
<!-- Footer -->
|
|
<tr><td style="padding: 16px 32px; background-color: #f9fafb; border-top: 1px solid #e5e7eb;">
|
|
<p style="margin: 0; font-size: 12px; color: #9ca3af; text-align: center;">
|
|
Diese E-Mail wurde automatisch von PowerOn versendet.
|
|
</p>
|
|
{operatorLine}
|
|
</td></tr>
|
|
</table>
|
|
</td></tr>
|
|
</table>
|
|
</body>
|
|
</html>"""
|
|
|
|
|
|
# ============================================================================
|
|
# Public API
|
|
# ============================================================================
|
|
|
|
|
|
def notifyMandateAdmins(
|
|
mandateId: str,
|
|
subject: str,
|
|
headline: str,
|
|
bodyParagraphs: List[str],
|
|
*,
|
|
footerNote: Optional[str] = None,
|
|
rawHtmlBlock: Optional[str] = None,
|
|
) -> int:
|
|
"""
|
|
Send a styled HTML notification to all mandate admins and configured contacts.
|
|
|
|
Args:
|
|
mandateId: The mandate to notify admins for.
|
|
subject: Email subject line.
|
|
headline: Bold headline inside the email body.
|
|
bodyParagraphs: List of paragraph strings (plain text, auto-escaped).
|
|
footerNote: Optional small-print note below the main content.
|
|
rawHtmlBlock: Optional pre-formatted HTML block (e.g. invoice summary table).
|
|
|
|
Returns:
|
|
Number of recipients that were successfully notified.
|
|
"""
|
|
if not mandateId:
|
|
return 0
|
|
|
|
recipients = _resolveAllRecipients(mandateId)
|
|
if not recipients:
|
|
logger.warning(
|
|
"notifyMandateAdmins: no recipients found for mandate %s "
|
|
"(no notifyEmails configured and no admin users with email)",
|
|
mandateId,
|
|
)
|
|
return 0
|
|
|
|
mandateName = _resolveMandateName(mandateId)
|
|
htmlMessage = _renderHtmlEmail(headline, bodyParagraphs, mandateName, footerNote, rawHtmlBlock)
|
|
messaging = getMessagingInterface()
|
|
successCount = 0
|
|
|
|
for to in recipients:
|
|
try:
|
|
ok = messaging.send(
|
|
channel=MessagingChannel.EMAIL,
|
|
recipient=to,
|
|
subject=subject,
|
|
message=htmlMessage,
|
|
)
|
|
if ok:
|
|
successCount += 1
|
|
else:
|
|
logger.warning("notifyMandateAdmins: send failed for %s", to)
|
|
except Exception as e:
|
|
logger.error("notifyMandateAdmins: error sending to %s: %s", to, e)
|
|
|
|
logger.info(
|
|
"notifyMandateAdmins: sent '%s' to %d/%d recipients for mandate %s (%s)",
|
|
subject, successCount, len(recipients), mandateId, mandateName,
|
|
)
|
|
return successCount
|