# Copyright (c) 2025 Patrick Motsch # All rights reserved. """ Central notification utility for mandate administrators. All mandate-level notifications (subscription changes, billing warnings, etc.) MUST go through notifyMandateAdmins() to ensure consistent recipient resolution and delivery. Recipients: all users with the mandate-level "admin" RBAC role. """ from __future__ import annotations import html import json import logging from typing import Any, Dict, List, Optional, Set from modules.datamodels.datamodelMessaging import MessagingChannel from modules.interfaces.interfaceMessaging import getInterface as getMessagingInterface logger = logging.getLogger(__name__) # ============================================================================ # Recipient resolution # ============================================================================ def _normalizeEmailList(raw: Any) -> List[str]: """Parse notifyEmails which can be a list, JSON string, or single address.""" if raw is None: return [] if isinstance(raw, list): return [str(e).strip().lower() for e in raw if str(e).strip()] if isinstance(raw, str): s = raw.strip() if not s: return [] if s.startswith("["): try: parsed = json.loads(s) if isinstance(parsed, list): return [str(e).strip().lower() for e in parsed if str(e).strip()] except Exception: pass return [s.lower()] return [] def _resolveMandateContactEmails(mandateId: str) -> List[str]: """Get the configured notifyEmails from BillingSettings.""" try: from modules.interfaces.interfaceDbBilling import getInterface as getBillingInterface from modules.security.rootAccess import getRootUser billingIf = getBillingInterface(getRootUser(), mandateId) settings = billingIf.getSettings(mandateId) or {} return _normalizeEmailList(settings.get("notifyEmails")) except Exception as e: logger.warning("Could not resolve BillingSettings.notifyEmails for mandate %s: %s", mandateId, e) return [] def _resolveMandateAdminEmails(mandateId: str) -> List[str]: """Resolve all admin users of a mandate via RBAC and return their emails.""" emails: List[str] = [] try: from modules.interfaces.interfaceDbApp import getRootInterface rootIf = getRootInterface() userMandates = rootIf.getUserMandatesByMandate(mandateId) for um in userMandates: if not getattr(um, "enabled", True): continue umId = str(getattr(um, "id", "")) userId = getattr(um, "userId", None) if not userId: continue roleIds = rootIf.getRoleIdsForUserMandate(umId) isAdmin = False for roleId in roleIds: role = rootIf.getRole(roleId) if role and role.roleLabel == "admin" and not role.featureInstanceId: isAdmin = True break if not isAdmin: continue user = rootIf.getUser(str(userId)) if user and user.email: emails.append(user.email.strip().lower()) except Exception as e: logger.warning("Could not resolve admin emails for mandate %s: %s", mandateId, e) return emails def _resolveAllRecipients(mandateId: str) -> List[str]: """Mandate admin user emails only (RBAC-resolved), deduplicated.""" seen: Set[str] = set() result: List[str] = [] for email in _resolveMandateAdminEmails(mandateId): if email and email not in seen: seen.add(email) result.append(email) return result # ============================================================================ # Mandate name resolution # ============================================================================ def resolveMandateName(mandateId: str) -> str: """Return the human-readable mandate name (label or name), falling back to a short ID.""" try: from modules.datamodels.datamodelUam import Mandate from modules.security.rootAccess import getRootDbAppConnector appDb = getRootDbAppConnector() rows = appDb.getRecordset(Mandate, recordFilter={"id": mandateId}) if rows: return rows[0].get("label") or rows[0].get("name") or mandateId[:8] except Exception as e: logger.warning("Could not resolve mandate name for %s: %s", mandateId, e) return mandateId[:8] # ============================================================================ # HTML email rendering # ============================================================================ def buildBillingAdminUrl(frontendUrl: str) -> str: """Absolute URL to /billing/admin for the given frontend base (no trailing slash on base).""" base = (frontendUrl or "").strip().rstrip("/") if not base: return "" return f"{base}/billing/admin" def buildActionLinkHtmlBlock(url: str, buttonText: str) -> str: """Render a CTA button plus the same URL as a plain link (auth-email style).""" if not url or not buttonText: return "" escaped_url = html.escape(url) escaped_label = html.escape(buttonText) return f'''
{escaped_url}
''' def buildBillingAdminActionBlock( frontendUrl: str, buttonText: str = "Billing-Verwaltung öffnen", ) -> str: """CTA block linking to /billing/admin (auth-email style button + plain link).""" admin_url = buildBillingAdminUrl(frontendUrl) if not admin_url: return "" return buildActionLinkHtmlBlock(admin_url, buttonText) def _resolveBillingAdminFrontendUrl(frontendUrl: Optional[str]) -> str: """Explicit frontendUrl param, else per-request value from middleware.""" explicit = (frontendUrl or "").strip().rstrip("/") if explicit: return explicit try: from modules.shared.requestFrontendUrl import getRequestFrontendUrl return getRequestFrontendUrl() except Exception: return "" def _getOperatorInfo() -> Dict[str, str]: """Load operator company data from config.ini.""" try: from modules.shared.configuration import APP_CONFIG return { "companyName": APP_CONFIG.get("Operator_CompanyName", ""), "address": APP_CONFIG.get("Operator_Address", ""), "vatNumber": APP_CONFIG.get("Operator_VatNumber", ""), } except Exception: return {"companyName": "", "address": "", "vatNumber": ""} def renderHtmlEmail( headline: str, bodyParagraphs: List[str], mandateName: str, footerNote: Optional[str] = None, rawHtmlBlock: Optional[str] = None, actionHtmlBlock: Optional[str] = None, ) -> str: """Render a clean, professional HTML notification email. Args: rawHtmlBlock: Optional pre-formatted HTML inserted after bodyParagraphs (e.g. invoice table). actionHtmlBlock: Optional CTA block (e.g. button + link to billing admin). """ hl = html.escape(headline) mn = html.escape(mandateName) paragraphsHtml = "" for p in bodyParagraphs: escaped = html.escape(p).replace("\n", "{escaped}
\n' rawBlock = "" if rawHtmlBlock: rawBlock = f'' f'{html.escape(footerNote)}
\n' ) operator = _getOperatorInfo() operatorLine = "" parts = [p for p in [operator["companyName"], operator["address"], operator["vatNumber"]] if p] if parts: operatorLine = ( f'' f'{html.escape(" | ".join(parts))}
\n' ) return f"""