From c813bd63ca8e9c425dc0d6aae30f177791eeed28 Mon Sep 17 00:00:00 2001
From: ValueOn AG
Date: Sun, 22 Mar 2026 17:23:54 +0100
Subject: [PATCH] subscription base logic
---
app.py | 3 +
config.ini | 5 +
modules/datamodels/datamodelBilling.py | 9 +-
modules/datamodels/datamodelSubscription.py | 235 ++++++
.../workspace/routeFeatureWorkspace.py | 13 +-
modules/interfaces/interfaceBootstrap.py | 48 ++
modules/interfaces/interfaceDbApp.py | 32 +-
modules/interfaces/interfaceDbSubscription.py | 353 +++++++++
modules/routes/routeAdminFeatures.py | 29 +-
modules/routes/routeBilling.py | 313 +++++++-
modules/routes/routeSubscription.py | 364 +++++++++
modules/serviceCenter/registry.py | 9 +-
.../services/serviceAgent/agentLoop.py | 15 +
.../services/serviceAi/mainServiceAi.py | 23 +-
.../serviceBilling/billingExhaustedNotify.py | 102 +--
.../serviceBilling/mainServiceBilling.py | 35 +
.../services/serviceBilling/stripeCheckout.py | 13 +-
.../services/serviceSubscription/__init__.py | 0
.../mainServiceSubscription.py | 710 ++++++++++++++++++
.../serviceSubscription/stripeBootstrap.py | 214 ++++++
modules/shared/configuration.py | 2 +-
modules/shared/notifyMandateAdmins.py | 285 +++++++
modules/shared/stripeClient.py | 38 +
modules/system/mainSystem.py | 1 -
24 files changed, 2722 insertions(+), 129 deletions(-)
create mode 100644 modules/datamodels/datamodelSubscription.py
create mode 100644 modules/interfaces/interfaceDbSubscription.py
create mode 100644 modules/routes/routeSubscription.py
create mode 100644 modules/serviceCenter/services/serviceSubscription/__init__.py
create mode 100644 modules/serviceCenter/services/serviceSubscription/mainServiceSubscription.py
create mode 100644 modules/serviceCenter/services/serviceSubscription/stripeBootstrap.py
create mode 100644 modules/shared/notifyMandateAdmins.py
create mode 100644 modules/shared/stripeClient.py
diff --git a/app.py b/app.py
index 0c769a2a..c1400353 100644
--- a/app.py
+++ b/app.py
@@ -601,6 +601,9 @@ app.include_router(gdprRouter)
from modules.routes.routeBilling import router as billingRouter
app.include_router(billingRouter)
+from modules.routes.routeSubscription import router as subscriptionRouter
+app.include_router(subscriptionRouter)
+
# ============================================================================
# SYSTEM ROUTES (Navigation, etc.)
# ============================================================================
diff --git a/config.ini b/config.ini
index ccd6b77e..4a37f2f8 100644
--- a/config.ini
+++ b/config.ini
@@ -44,3 +44,8 @@ Connector_StacSwisstopo_TIMEOUT = 30
Connector_StacSwisstopo_MAX_RETRIES = 3
Connector_StacSwisstopo_RETRY_DELAY = 1.0
Connector_StacSwisstopo_ENABLE_CACHE = True
+
+# Operator company information (shown on invoice emails)
+Operator_CompanyName = PowerOn AG
+Operator_Address = Birmensdorferstrasse 94, 8003 Zürich
+Operator_VatNumber = CHE491.960.195
diff --git a/modules/datamodels/datamodelBilling.py b/modules/datamodels/datamodelBilling.py
index 8ffbdef1..995ac75d 100644
--- a/modules/datamodels/datamodelBilling.py
+++ b/modules/datamodels/datamodelBilling.py
@@ -143,6 +143,9 @@ class BillingSettings(BaseModel):
)
warningThresholdPercent: float = Field(default=10.0, description="Warning threshold as percentage")
+ # Stripe
+ stripeCustomerId: Optional[str] = Field(None, description="Stripe Customer ID (cus_xxx) — one per mandate")
+
# Notifications (e.g. mandate owner / finance — also used when PREPAY_MANDATE pool is exhausted)
notifyEmails: List[str] = Field(
default_factory=list,
@@ -163,6 +166,7 @@ registerModelLabels(
"de": "Startguthaben nur Root-Mandant (CHF)",
},
"warningThresholdPercent": {"en": "Warning Threshold (%)", "de": "Warnschwelle (%)"},
+ "stripeCustomerId": {"en": "Stripe Customer ID", "de": "Stripe-Kunden-ID"},
"notifyEmails": {
"en": "Billing notification emails (owner / admin)",
"de": "E-Mails für Billing-Alerts (Inhaber/Admin)",
@@ -260,12 +264,15 @@ class BillingStatisticsResponse(BaseModel):
class BillingCheckResult(BaseModel):
- """Result of a billing balance check."""
+ """Result of a billing balance check (budget + subscription gate)."""
allowed: bool
reason: Optional[str] = None
currentBalance: Optional[float] = None
requiredAmount: Optional[float] = None
billingModel: Optional[BillingModelEnum] = None
+ upgradeRequired: Optional[bool] = None
+ subscriptionUiPath: Optional[str] = None
+ userAction: Optional[str] = None
def parseBillingModelFromStoredValue(raw: Optional[str]) -> BillingModelEnum:
diff --git a/modules/datamodels/datamodelSubscription.py b/modules/datamodels/datamodelSubscription.py
new file mode 100644
index 00000000..1c1435d8
--- /dev/null
+++ b/modules/datamodels/datamodelSubscription.py
@@ -0,0 +1,235 @@
+# Copyright (c) 2025 Patrick Motsch
+# All rights reserved.
+"""Subscription models: SubscriptionPlan (catalog), MandateSubscription (instance per mandate),
+StripePlanPrice (persisted Stripe IDs per plan).
+
+State Machine: see wiki/concepts/Subscription-State-Machine.md
+"""
+
+from typing import Dict, List, Optional
+from enum import Enum
+from datetime import datetime, timezone
+from pydantic import BaseModel, Field
+from modules.shared.attributeUtils import registerModelLabels
+import uuid
+
+
+class SubscriptionStatusEnum(str, Enum):
+ """Lifecycle status of a mandate subscription.
+ See wiki/concepts/Subscription-State-Machine.md for transition rules."""
+ PENDING = "PENDING"
+ SCHEDULED = "SCHEDULED"
+ TRIALING = "TRIALING"
+ ACTIVE = "ACTIVE"
+ PAST_DUE = "PAST_DUE"
+ EXPIRED = "EXPIRED"
+
+
+TERMINAL_STATUSES = {SubscriptionStatusEnum.EXPIRED}
+OPERATIVE_STATUSES = {SubscriptionStatusEnum.ACTIVE, SubscriptionStatusEnum.TRIALING, SubscriptionStatusEnum.PAST_DUE}
+
+ALLOWED_TRANSITIONS = {
+ (SubscriptionStatusEnum.PENDING, SubscriptionStatusEnum.ACTIVE),
+ (SubscriptionStatusEnum.PENDING, SubscriptionStatusEnum.SCHEDULED),
+ (SubscriptionStatusEnum.PENDING, SubscriptionStatusEnum.EXPIRED),
+ (SubscriptionStatusEnum.SCHEDULED, SubscriptionStatusEnum.ACTIVE),
+ (SubscriptionStatusEnum.SCHEDULED, SubscriptionStatusEnum.EXPIRED),
+ (SubscriptionStatusEnum.TRIALING, SubscriptionStatusEnum.EXPIRED),
+ (SubscriptionStatusEnum.ACTIVE, SubscriptionStatusEnum.PAST_DUE),
+ (SubscriptionStatusEnum.ACTIVE, SubscriptionStatusEnum.EXPIRED),
+ (SubscriptionStatusEnum.PAST_DUE, SubscriptionStatusEnum.ACTIVE),
+ (SubscriptionStatusEnum.PAST_DUE, SubscriptionStatusEnum.EXPIRED),
+}
+
+
+class BillingPeriodEnum(str, Enum):
+ """Recurring billing interval."""
+ MONTHLY = "MONTHLY"
+ YEARLY = "YEARLY"
+ NONE = "NONE"
+
+
+# ============================================================================
+# Catalog: SubscriptionPlan (static, in-memory)
+# ============================================================================
+
+class SubscriptionPlan(BaseModel):
+ """Plan definition (catalog entry). Not stored per mandate — static."""
+ planKey: str = Field(..., description="Unique plan identifier")
+ selectableByUser: bool = Field(default=True, description="Whether users can choose this plan in the UI")
+
+ title: Dict[str, str] = Field(default_factory=dict, description="Multilingual title (en/de/fr)")
+ description: Dict[str, str] = Field(default_factory=dict, description="Multilingual description")
+
+ currency: str = Field(default="CHF", description="Billing currency")
+ billingPeriod: BillingPeriodEnum = Field(default=BillingPeriodEnum.MONTHLY, description="Recurring interval")
+ pricePerUserCHF: float = Field(default=0.0, description="Price per active user per period")
+ pricePerFeatureInstanceCHF: float = Field(default=0.0, description="Price per active feature instance per period")
+ autoRenew: bool = Field(default=True, description="Stripe renews automatically at period end")
+
+ maxUsers: Optional[int] = Field(None, description="Hard cap on active users (None = unlimited)")
+ maxFeatureInstances: Optional[int] = Field(None, description="Hard cap on active feature instances (None = unlimited)")
+ trialDays: Optional[int] = Field(None, description="Trial duration in days (only for trial plans)")
+ successorPlanKey: Optional[str] = Field(None, description="Plan to transition to when trial ends")
+
+
+registerModelLabels(
+ "SubscriptionPlan",
+ {"en": "Subscription Plan", "de": "Abonnement-Plan", "fr": "Plan d'abonnement"},
+ {
+ "planKey": {"en": "Plan", "de": "Plan", "fr": "Plan"},
+ "selectableByUser": {"en": "Selectable", "de": "Wählbar", "fr": "Sélectionnable"},
+ "billingPeriod": {"en": "Billing Period", "de": "Abrechnungszeitraum", "fr": "Période de facturation"},
+ "pricePerUserCHF": {"en": "Price per User (CHF)", "de": "Preis pro User (CHF)"},
+ "pricePerFeatureInstanceCHF": {"en": "Price per Instance (CHF)", "de": "Preis pro Instanz (CHF)"},
+ "maxUsers": {"en": "Max Users", "de": "Max. Benutzer", "fr": "Max. utilisateurs"},
+ "maxFeatureInstances": {"en": "Max Instances", "de": "Max. Instanzen", "fr": "Max. instances"},
+ },
+)
+
+
+# ============================================================================
+# Stripe Price mapping (persisted in DB, auto-created at bootstrap)
+# ============================================================================
+
+class StripePlanPrice(BaseModel):
+ """Persisted mapping from planKey to Stripe Product/Price IDs.
+ Auto-created at startup — no manual configuration needed.
+ Uses separate Stripe Products for users and instances for clear invoice labels."""
+ id: str = Field(default_factory=lambda: str(uuid.uuid4()), description="Primary key")
+ planKey: str = Field(..., description="Reference to SubscriptionPlan.planKey")
+ stripeProductId: str = Field("", description="Legacy single-product ID (unused)")
+ stripeProductIdUsers: Optional[str] = Field(None, description="Stripe Product ID for user licenses")
+ stripeProductIdInstances: Optional[str] = Field(None, description="Stripe Product ID for feature instances")
+ stripePriceIdUsers: Optional[str] = Field(None, description="Stripe Price ID for user-seat line item")
+ stripePriceIdInstances: Optional[str] = Field(None, description="Stripe Price ID for instance line item")
+
+
+registerModelLabels(
+ "StripePlanPrice",
+ {"en": "Stripe Plan Prices", "de": "Stripe-Planpreise"},
+ {
+ "planKey": {"en": "Plan", "de": "Plan"},
+ "stripeProductIdUsers": {"en": "Product (Users)", "de": "Produkt (User)"},
+ "stripeProductIdInstances": {"en": "Product (Instances)", "de": "Produkt (Instanzen)"},
+ "stripePriceIdUsers": {"en": "Price ID (Users)", "de": "Preis-ID (User)"},
+ "stripePriceIdInstances": {"en": "Price ID (Instances)", "de": "Preis-ID (Instanzen)"},
+ },
+)
+
+
+# ============================================================================
+# Instance: MandateSubscription
+# ============================================================================
+
+class MandateSubscription(BaseModel):
+ """A subscription instance bound to a specific mandate.
+ See wiki/concepts/Subscription-State-Machine.md for state transitions."""
+ id: str = Field(default_factory=lambda: str(uuid.uuid4()), description="Primary key")
+ mandateId: str = Field(..., description="Foreign key to Mandate")
+ planKey: str = Field(..., description="Reference to SubscriptionPlan.planKey")
+
+ status: SubscriptionStatusEnum = Field(default=SubscriptionStatusEnum.PENDING, description="Current lifecycle status")
+ recurring: bool = Field(default=True, description="True: auto-renews at period end. False: expires at period end (gekuendigt).")
+
+ startedAt: datetime = Field(default_factory=lambda: datetime.now(timezone.utc), description="Record creation timestamp")
+ effectiveFrom: Optional[datetime] = Field(None, description="When this subscription becomes operative. None = immediate. Set for SCHEDULED subs.")
+ endedAt: Optional[datetime] = Field(None, description="When subscription ended (terminal)")
+ currentPeriodStart: Optional[datetime] = Field(None, description="Current billing period start (synced from Stripe)")
+ currentPeriodEnd: Optional[datetime] = Field(None, description="Current billing period end (synced from Stripe)")
+ trialEndsAt: Optional[datetime] = Field(None, description="Trial expiry timestamp")
+
+ snapshotPricePerUserCHF: float = Field(default=0.0, description="Price snapshot at activation (for invoice history)")
+ snapshotPricePerInstanceCHF: float = Field(default=0.0, description="Price snapshot at activation")
+
+ stripeSubscriptionId: Optional[str] = Field(None, description="Stripe Subscription ID (sub_xxx)")
+ stripeItemIdUsers: Optional[str] = Field(None, description="Stripe Subscription Item ID for user seats")
+ stripeItemIdInstances: Optional[str] = Field(None, description="Stripe Subscription Item ID for feature instances")
+
+
+registerModelLabels(
+ "MandateSubscription",
+ {"en": "Mandate Subscription", "de": "Mandanten-Abonnement", "fr": "Abonnement du mandat"},
+ {
+ "id": {"en": "ID", "de": "ID"},
+ "mandateId": {"en": "Mandate ID", "de": "Mandanten-ID"},
+ "planKey": {"en": "Plan", "de": "Plan"},
+ "status": {"en": "Status", "de": "Status"},
+ "recurring": {"en": "Recurring", "de": "Wiederkehrend"},
+ "startedAt": {"en": "Started", "de": "Gestartet"},
+ "effectiveFrom": {"en": "Effective From", "de": "Wirksam ab"},
+ "endedAt": {"en": "Ended", "de": "Beendet"},
+ "currentPeriodStart": {"en": "Period Start", "de": "Periodenbeginn"},
+ "currentPeriodEnd": {"en": "Period End", "de": "Periodenende"},
+ "trialEndsAt": {"en": "Trial Ends", "de": "Trial endet"},
+ "snapshotPricePerUserCHF": {"en": "Price/User (CHF)", "de": "Preis/User (CHF)"},
+ "snapshotPricePerInstanceCHF": {"en": "Price/Instance (CHF)", "de": "Preis/Instanz (CHF)"},
+ },
+)
+
+
+# ============================================================================
+# Built-in plan catalog (static, no env dependency)
+# ============================================================================
+
+BUILTIN_PLANS: Dict[str, SubscriptionPlan] = {
+ "ROOT": SubscriptionPlan(
+ planKey="ROOT",
+ selectableByUser=False,
+ title={"en": "Root (System)", "de": "Root (System)", "fr": "Root (Système)"},
+ description={"en": "Internal system plan — no billing.", "de": "Interner Systemplan — keine Verrechnung."},
+ billingPeriod=BillingPeriodEnum.NONE,
+ autoRenew=False,
+ maxUsers=None,
+ maxFeatureInstances=None,
+ ),
+ "TRIAL_7D": SubscriptionPlan(
+ planKey="TRIAL_7D",
+ selectableByUser=False,
+ title={"en": "Free Trial (7 days)", "de": "Gratis-Testphase (7 Tage)", "fr": "Essai gratuit (7 jours)"},
+ description={
+ "en": "Try the platform for 7 days — 1 user, up to 3 feature instances.",
+ "de": "Plattform 7 Tage testen — 1 User, bis zu 3 Feature-Instanzen.",
+ },
+ billingPeriod=BillingPeriodEnum.NONE,
+ autoRenew=False,
+ maxUsers=1,
+ maxFeatureInstances=3,
+ trialDays=7,
+ successorPlanKey="STANDARD_MONTHLY",
+ ),
+ "STANDARD_MONTHLY": SubscriptionPlan(
+ planKey="STANDARD_MONTHLY",
+ selectableByUser=True,
+ title={"en": "Standard (Monthly)", "de": "Standard (Monatlich)", "fr": "Standard (Mensuel)"},
+ description={
+ "en": "Usage-based billing per active user and feature instance, billed monthly.",
+ "de": "Nutzungsbasierte Abrechnung pro aktivem User und Feature-Instanz, monatlich.",
+ },
+ billingPeriod=BillingPeriodEnum.MONTHLY,
+ pricePerUserCHF=90.0,
+ pricePerFeatureInstanceCHF=150.0,
+ ),
+ "STANDARD_YEARLY": SubscriptionPlan(
+ planKey="STANDARD_YEARLY",
+ selectableByUser=True,
+ title={"en": "Standard (Yearly)", "de": "Standard (Jährlich)", "fr": "Standard (Annuel)"},
+ description={
+ "en": "Usage-based billing per active user and feature instance, billed yearly.",
+ "de": "Nutzungsbasierte Abrechnung pro aktivem User und Feature-Instanz, jährlich.",
+ },
+ billingPeriod=BillingPeriodEnum.YEARLY,
+ pricePerUserCHF=1080.0,
+ pricePerFeatureInstanceCHF=1800.0,
+ ),
+}
+
+
+def _getPlan(planKey: str) -> Optional[SubscriptionPlan]:
+ """Resolve a plan by key from the built-in catalog."""
+ return BUILTIN_PLANS.get(planKey)
+
+
+def _getSelectablePlans() -> List[SubscriptionPlan]:
+ """Return plans that users can choose in the UI."""
+ return [p for p in BUILTIN_PLANS.values() if p.selectableByUser]
diff --git a/modules/features/workspace/routeFeatureWorkspace.py b/modules/features/workspace/routeFeatureWorkspace.py
index d0dd22da..6dc7774e 100644
--- a/modules/features/workspace/routeFeatureWorkspace.py
+++ b/modules/features/workspace/routeFeatureWorkspace.py
@@ -19,6 +19,9 @@ from modules.auth import limiter, getRequestContext, RequestContext
from modules.serviceCenter.services.serviceBilling.mainServiceBilling import (
InsufficientBalanceException,
)
+from modules.serviceCenter.services.serviceSubscription.mainServiceSubscription import (
+ SubscriptionInactiveException,
+)
from modules.interfaces import interfaceDbChat, interfaceDbManagement
from modules.interfaces.interfaceAiObjects import AiObjects
from modules.serviceCenter.core.serviceStreaming import get_event_manager
@@ -803,7 +806,15 @@ async def _runWorkspaceAgent(
})
except Exception as e:
- if isinstance(e, InsufficientBalanceException):
+ if isinstance(e, SubscriptionInactiveException):
+ logger.warning(f"Workspace blocked by subscription: {e.message}")
+ await eventManager.emit_event(queueId, "error", {
+ "type": "error",
+ "content": e.message,
+ "workflowId": workflowId,
+ "item": e.toClientDict(),
+ })
+ elif isinstance(e, InsufficientBalanceException):
logger.warning(f"Workspace blocked by billing: {e.message}")
await eventManager.emit_event(queueId, "error", {
"type": "error",
diff --git a/modules/interfaces/interfaceBootstrap.py b/modules/interfaces/interfaceBootstrap.py
index c1cac9ef..4a3881f5 100644
--- a/modules/interfaces/interfaceBootstrap.py
+++ b/modules/interfaces/interfaceBootstrap.py
@@ -103,6 +103,13 @@ def initBootstrap(db: DatabaseConnector) -> None:
if mandateId:
initRootMandateBilling(mandateId)
+ # Initialize subscription for root mandate
+ if mandateId:
+ _initRootMandateSubscription(mandateId)
+
+ # Auto-provision Stripe Products/Prices for paid plans (idempotent)
+ _bootstrapStripePrices()
+
def initAutomationTemplates(dbApp: DatabaseConnector, adminUserId: Optional[str] = None) -> None:
"""
@@ -2069,6 +2076,47 @@ def initRootMandateBilling(mandateId: str) -> None:
logger.warning(f"Failed to initialize root mandate billing (non-critical): {e}")
+def _initRootMandateSubscription(mandateId: str) -> None:
+ """
+ Ensure the root mandate has an active ROOT subscription.
+ Called during bootstrap after billing init.
+ """
+ try:
+ from modules.interfaces.interfaceDbSubscription import _getRootInterface as getSubRootInterface
+ from modules.datamodels.datamodelSubscription import (
+ MandateSubscription,
+ SubscriptionStatusEnum,
+ )
+
+ subInterface = getSubRootInterface()
+ existing = subInterface.getOperativeForMandate(mandateId)
+ if existing:
+ logger.info("Root mandate subscription already exists")
+ return
+
+ sub = MandateSubscription(
+ mandateId=mandateId,
+ planKey="ROOT",
+ status=SubscriptionStatusEnum.ACTIVE,
+ recurring=False,
+ )
+ subInterface.createSubscription(sub)
+ logger.info("Created ROOT subscription for root mandate")
+
+ except Exception as e:
+ logger.warning(f"Failed to initialize root mandate subscription (non-critical): {e}")
+
+
+def _bootstrapStripePrices() -> None:
+ """Auto-create Stripe Products and Prices for all paid plans.
+ Idempotent — safe on every startup. IDs are persisted in the StripePlanPrice table."""
+ try:
+ from modules.serviceCenter.services.serviceSubscription.stripeBootstrap import bootstrapStripePrices
+ bootstrapStripePrices()
+ except Exception as e:
+ logger.error(f"Stripe price bootstrap failed (subscriptions will not work for paid plans): {e}")
+
+
def assignInitialUserMemberships(
db: DatabaseConnector,
mandateId: str,
diff --git a/modules/interfaces/interfaceDbApp.py b/modules/interfaces/interfaceDbApp.py
index 2fec872d..092fd589 100644
--- a/modules/interfaces/interfaceDbApp.py
+++ b/modules/interfaces/interfaceDbApp.py
@@ -1615,7 +1615,10 @@ class AppObjects:
existing = self.getUserMandate(userId, mandateId)
if existing:
raise ValueError(f"User {userId} is already member of mandate {mandateId}")
-
+
+ # Subscription capacity check (before insert)
+ self._checkSubscriptionCapacity(mandateId, "users", delta=1)
+
# Create UserMandate
userMandate = UserMandate(
userId=userId,
@@ -1636,7 +1639,10 @@ class AppObjects:
# Create billing account for user if billing is configured
self._ensureUserBillingAccount(userId, mandateId)
-
+
+ # Sync Stripe quantity after successful insert
+ self._syncSubscriptionQuantity(mandateId)
+
cleanedRecord = {k: v for k, v in createdRecord.items() if not k.startswith("_")}
return UserMandate(**cleanedRecord)
except Exception as e:
@@ -1686,6 +1692,28 @@ class AppObjects:
except Exception as e:
logger.warning(f"Failed to create billing account for user {userId} (non-critical): {e}")
+ def _checkSubscriptionCapacity(self, mandateId: str, resourceType: str, delta: int = 1) -> None:
+ """Check subscription capacity before creating a resource. Raises on cap violation."""
+ try:
+ from modules.interfaces.interfaceDbSubscription import getInterface as getSubInterface
+ from modules.security.rootAccess import getRootUser
+ subIf = getSubInterface(getRootUser(), mandateId)
+ subIf.assertCapacity(mandateId, resourceType, delta)
+ except Exception as e:
+ if "SubscriptionCapacityException" in type(e).__name__:
+ raise
+ logger.debug(f"Subscription capacity check skipped: {e}")
+
+ def _syncSubscriptionQuantity(self, mandateId: str) -> None:
+ """Sync Stripe subscription quantities after a resource mutation."""
+ try:
+ from modules.interfaces.interfaceDbSubscription import getInterface as getSubInterface
+ from modules.security.rootAccess import getRootUser
+ subIf = getSubInterface(getRootUser(), mandateId)
+ subIf.syncQuantityToStripe(mandateId)
+ except Exception as e:
+ logger.debug(f"Subscription quantity sync skipped: {e}")
+
def deleteUserMandate(self, userId: str, mandateId: str) -> bool:
"""
Delete a UserMandate record (remove user from mandate).
diff --git a/modules/interfaces/interfaceDbSubscription.py b/modules/interfaces/interfaceDbSubscription.py
new file mode 100644
index 00000000..f08025ea
--- /dev/null
+++ b/modules/interfaces/interfaceDbSubscription.py
@@ -0,0 +1,353 @@
+# Copyright (c) 2025 Patrick Motsch
+# All rights reserved.
+"""
+Interface for Subscription operations — ID-based, deterministic.
+
+Every write operation takes an explicit subscriptionId.
+No status-scan guessing. See wiki/concepts/Subscription-State-Machine.md.
+"""
+
+import logging
+from typing import Dict, Any, List, Optional
+from datetime import datetime, timezone
+
+from modules.connectors.connectorDbPostgre import DatabaseConnector
+from modules.shared.configuration import APP_CONFIG
+from modules.datamodels.datamodelUam import User
+from modules.datamodels.datamodelMembership import UserMandate
+from modules.datamodels.datamodelSubscription import (
+ SubscriptionPlan,
+ MandateSubscription,
+ SubscriptionStatusEnum,
+ BillingPeriodEnum,
+ ALLOWED_TRANSITIONS,
+ TERMINAL_STATUSES,
+ OPERATIVE_STATUSES,
+ BUILTIN_PLANS,
+ _getPlan,
+ _getSelectablePlans,
+)
+
+logger = logging.getLogger(__name__)
+
+SUBSCRIPTION_DATABASE = "poweron_billing"
+
+_subscriptionInterfaces: Dict[str, "SubscriptionObjects"] = {}
+
+
+class InvalidTransitionError(Exception):
+ """Raised when a state transition is not allowed by the state machine."""
+ def __init__(self, subscriptionId: str, fromStatus: str, toStatus: str):
+ self.subscriptionId = subscriptionId
+ self.fromStatus = fromStatus
+ self.toStatus = toStatus
+ super().__init__(f"Invalid transition {fromStatus} -> {toStatus} for subscription {subscriptionId}")
+
+
+def getInterface(currentUser: User, mandateId: str = None) -> "SubscriptionObjects":
+ cacheKey = f"{currentUser.id}_{mandateId}"
+ if cacheKey not in _subscriptionInterfaces:
+ _subscriptionInterfaces[cacheKey] = SubscriptionObjects(currentUser, mandateId)
+ else:
+ _subscriptionInterfaces[cacheKey].setUserContext(currentUser, mandateId)
+ return _subscriptionInterfaces[cacheKey]
+
+
+def _getRootInterface() -> "SubscriptionObjects":
+ from modules.security.rootAccess import getRootUser
+ return SubscriptionObjects(getRootUser(), mandateId=None)
+
+
+def _getAppDatabaseConnector() -> DatabaseConnector:
+ return DatabaseConnector(
+ dbDatabase=APP_CONFIG.get("DB_DATABASE", "poweron_app"),
+ dbHost=APP_CONFIG.get("DB_HOST", "localhost"),
+ dbPort=int(APP_CONFIG.get("DB_PORT", "5432")),
+ dbUser=APP_CONFIG.get("DB_USER"),
+ dbPassword=APP_CONFIG.get("DB_PASSWORD_SECRET"),
+ )
+
+
+class SubscriptionObjects:
+ """Interface for subscription operations: CRUD, gate checks, Stripe sync.
+ All writes are ID-based. All status changes go through transitionStatus()."""
+
+ def __init__(self, currentUser: Optional[User] = None, mandateId: str = None):
+ self.currentUser = currentUser
+ self.userId = currentUser.id if currentUser else None
+ self.mandateId = mandateId
+ self.db = DatabaseConnector(
+ dbDatabase=SUBSCRIPTION_DATABASE,
+ dbHost=APP_CONFIG.get("DB_HOST", "localhost"),
+ dbPort=int(APP_CONFIG.get("DB_PORT", "5432")),
+ dbUser=APP_CONFIG.get("DB_USER"),
+ dbPassword=APP_CONFIG.get("DB_PASSWORD_SECRET"),
+ )
+
+ def setUserContext(self, currentUser: User, mandateId: str = None):
+ self.currentUser = currentUser
+ self.userId = currentUser.id if currentUser else None
+ self.mandateId = mandateId
+
+ # =========================================================================
+ # Plan catalog (in-memory)
+ # =========================================================================
+
+ def getPlan(self, planKey: str) -> Optional[SubscriptionPlan]:
+ return _getPlan(planKey)
+
+ def getSelectablePlans(self) -> List[SubscriptionPlan]:
+ return _getSelectablePlans()
+
+ # =========================================================================
+ # Read: by ID (primary access pattern)
+ # =========================================================================
+
+ def getById(self, subscriptionId: str) -> Optional[Dict[str, Any]]:
+ """Load a single subscription by its primary key."""
+ try:
+ results = self.db.getRecordset(MandateSubscription, recordFilter={"id": subscriptionId})
+ return dict(results[0]) if results else None
+ except Exception as e:
+ logger.error("getById(%s) failed: %s", subscriptionId, e)
+ return None
+
+ def getByStripeSubscriptionId(self, stripeSubId: str) -> Optional[Dict[str, Any]]:
+ """Load subscription by Stripe subscription ID — the webhook resolution path."""
+ try:
+ results = self.db.getRecordset(MandateSubscription, recordFilter={"stripeSubscriptionId": stripeSubId})
+ return dict(results[0]) if results else None
+ except Exception as e:
+ logger.error("getByStripeSubscriptionId(%s) failed: %s", stripeSubId, e)
+ return None
+
+ # =========================================================================
+ # Read: by mandate (list queries)
+ # =========================================================================
+
+ def listForMandate(self, mandateId: str, statusFilter: List[SubscriptionStatusEnum] = None) -> List[Dict[str, Any]]:
+ """Return all subscriptions for a mandate, optionally filtered by status.
+ Sorted newest-first by startedAt."""
+ try:
+ results = self.db.getRecordset(MandateSubscription, recordFilter={"mandateId": mandateId})
+ rows = [dict(r) for r in results]
+ if statusFilter:
+ filterValues = {s.value for s in statusFilter}
+ rows = [r for r in rows if r.get("status") in filterValues]
+ rows.sort(key=lambda r: r.get("startedAt", ""), reverse=True)
+ return rows
+ except Exception as e:
+ logger.error("listForMandate(%s) failed: %s", mandateId, e)
+ return []
+
+ def getOperativeForMandate(self, mandateId: str) -> Optional[Dict[str, Any]]:
+ """Return the single operative subscription (ACTIVE, TRIALING, or PAST_DUE).
+ This is a read-only query for the billing gate. Returns None if no operative sub exists."""
+ for row in self.listForMandate(mandateId):
+ if row.get("status") in {s.value for s in OPERATIVE_STATUSES}:
+ return row
+ return None
+
+ def getScheduledForMandate(self, mandateId: str) -> Optional[Dict[str, Any]]:
+ """Return a SCHEDULED subscription if one exists (next sub waiting to start)."""
+ for row in self.listForMandate(mandateId, [SubscriptionStatusEnum.SCHEDULED]):
+ return row
+ return None
+
+ # =========================================================================
+ # Read: global (SysAdmin)
+ # =========================================================================
+
+ def listAll(self, statusFilter: List[SubscriptionStatusEnum] = None) -> List[Dict[str, Any]]:
+ """Return ALL subscriptions across all mandates, newest-first. SysAdmin use only."""
+ try:
+ results = self.db.getRecordset(MandateSubscription)
+ rows = [dict(r) for r in results]
+ if statusFilter:
+ filterValues = {s.value for s in statusFilter}
+ rows = [r for r in rows if r.get("status") in filterValues]
+ rows.sort(key=lambda r: r.get("startedAt", ""), reverse=True)
+ return rows
+ except Exception as e:
+ logger.error("listAll() failed: %s", e)
+ return []
+
+ # =========================================================================
+ # Write: create
+ # =========================================================================
+
+ def createSubscription(self, sub: MandateSubscription) -> Dict[str, Any]:
+ """Persist a new MandateSubscription record."""
+ return self.db.recordCreate(MandateSubscription, sub.model_dump())
+
+ # =========================================================================
+ # Write: update fields (no status change)
+ # =========================================================================
+
+ def updateFields(self, subscriptionId: str, data: Dict[str, Any]) -> Dict[str, Any]:
+ """Update non-status fields on a subscription (e.g. recurring, Stripe IDs, periods).
+ Must NOT be used for status changes — use transitionStatus() for that."""
+ if "status" in data:
+ raise ValueError("updateFields must not change status — use transitionStatus()")
+ return self.db.recordModify(MandateSubscription, subscriptionId, data)
+
+ # =========================================================================
+ # Write: status transition (guarded)
+ # =========================================================================
+
+ def transitionStatus(
+ self,
+ subscriptionId: str,
+ expectedFromStatus: SubscriptionStatusEnum,
+ toStatus: SubscriptionStatusEnum,
+ additionalData: Dict[str, Any] = None,
+ ) -> Dict[str, Any]:
+ """Execute a guarded status transition.
+
+ 1. Load the record by ID
+ 2. Verify current status matches expectedFromStatus
+ 3. Verify the transition is allowed by the state machine
+ 4. Apply the update
+ """
+ sub = self.getById(subscriptionId)
+ if not sub:
+ raise ValueError(f"Subscription {subscriptionId} not found")
+
+ currentStatus = sub.get("status", "")
+ if currentStatus != expectedFromStatus.value:
+ raise InvalidTransitionError(subscriptionId, currentStatus, toStatus.value)
+
+ if (expectedFromStatus, toStatus) not in ALLOWED_TRANSITIONS:
+ raise InvalidTransitionError(subscriptionId, expectedFromStatus.value, toStatus.value)
+
+ updateData = {"status": toStatus.value}
+ if toStatus in TERMINAL_STATUSES and not (additionalData or {}).get("endedAt"):
+ updateData["endedAt"] = datetime.now(timezone.utc).isoformat()
+ if additionalData:
+ updateData.update(additionalData)
+
+ result = self.db.recordModify(MandateSubscription, subscriptionId, updateData)
+ logger.info("Transition %s -> %s for subscription %s", expectedFromStatus.value, toStatus.value, subscriptionId)
+ return result
+
+ def forceExpire(self, subscriptionId: str) -> Dict[str, Any]:
+ """Sysadmin force-expire: ANY non-terminal -> EXPIRED. Bypasses normal transition guards."""
+ sub = self.getById(subscriptionId)
+ if not sub:
+ raise ValueError(f"Subscription {subscriptionId} not found")
+
+ currentStatus = sub.get("status", "")
+ if currentStatus == SubscriptionStatusEnum.EXPIRED.value:
+ raise ValueError(f"Subscription {subscriptionId} is already EXPIRED")
+
+ result = self.db.recordModify(MandateSubscription, subscriptionId, {
+ "status": SubscriptionStatusEnum.EXPIRED.value,
+ "endedAt": datetime.now(timezone.utc).isoformat(),
+ })
+ logger.info("Force-expired subscription %s (was %s)", subscriptionId, currentStatus)
+ return result
+
+ # =========================================================================
+ # Gate: assertActive (read-only, for billing gate)
+ # =========================================================================
+
+ def assertActive(self, mandateId: str) -> SubscriptionStatusEnum:
+ """Return effective status for billing decisions.
+ Returns the operative subscription's status, or EXPIRED if none exists.
+ This is the ONLY read-by-mandate operation used in the hot path."""
+ sub = self.getOperativeForMandate(mandateId)
+ if sub:
+ return SubscriptionStatusEnum(sub["status"])
+ return SubscriptionStatusEnum.EXPIRED
+
+ # =========================================================================
+ # Gate: assertCapacity
+ # =========================================================================
+
+ def assertCapacity(self, mandateId: str, resourceType: str, delta: int = 1) -> bool:
+ sub = self.getOperativeForMandate(mandateId)
+ if not sub:
+ from modules.serviceCenter.services.serviceSubscription.mainServiceSubscription import SubscriptionCapacityException
+ raise SubscriptionCapacityException(
+ resourceType=resourceType, currentCount=0, maxAllowed=0,
+ message="No active subscription for this mandate.",
+ )
+
+ plan = self.getPlan(sub.get("planKey", ""))
+ if not plan:
+ return True
+
+ if resourceType == "users":
+ cap = plan.maxUsers
+ if cap is None:
+ return True
+ current = self.countActiveUsers(mandateId)
+ if current + delta > cap:
+ from modules.serviceCenter.services.serviceSubscription.mainServiceSubscription import SubscriptionCapacityException
+ raise SubscriptionCapacityException(resourceType=resourceType, currentCount=current, maxAllowed=cap)
+ elif resourceType == "featureInstances":
+ cap = plan.maxFeatureInstances
+ if cap is None:
+ return True
+ current = self.countActiveFeatureInstances(mandateId)
+ if current + delta > cap:
+ from modules.serviceCenter.services.serviceSubscription.mainServiceSubscription import SubscriptionCapacityException
+ raise SubscriptionCapacityException(resourceType=resourceType, currentCount=current, maxAllowed=cap)
+
+ return True
+
+ # =========================================================================
+ # Counting (cross-DB queries against poweron_app)
+ # =========================================================================
+
+ def countActiveUsers(self, mandateId: str) -> int:
+ try:
+ appDb = _getAppDatabaseConnector()
+ return len(appDb.getRecordset(UserMandate, recordFilter={"mandateId": mandateId}))
+ except Exception as e:
+ logger.error("countActiveUsers(%s) failed: %s", mandateId, e)
+ return 0
+
+ def countActiveFeatureInstances(self, mandateId: str) -> int:
+ try:
+ from modules.datamodels.datamodelFeatures import FeatureInstance
+ appDb = _getAppDatabaseConnector()
+ return len(appDb.getRecordset(FeatureInstance, recordFilter={"mandateId": mandateId, "enabled": True}))
+ except Exception as e:
+ logger.error("countActiveFeatureInstances(%s) failed: %s", mandateId, e)
+ return 0
+
+ # =========================================================================
+ # Stripe quantity sync
+ # =========================================================================
+
+ def syncQuantityToStripe(self, subscriptionId: str) -> None:
+ """Update Stripe subscription item quantities to match actual active counts.
+ Takes subscriptionId, not mandateId."""
+ sub = self.getById(subscriptionId)
+ if not sub or not sub.get("stripeSubscriptionId"):
+ return
+
+ mandateId = sub["mandateId"]
+ itemIdUsers = sub.get("stripeItemIdUsers")
+ itemIdInstances = sub.get("stripeItemIdInstances")
+
+ try:
+ from modules.shared.stripeClient import getStripeClient
+ stripe = getStripeClient()
+
+ activeUsers = self.countActiveUsers(mandateId)
+ activeInstances = self.countActiveFeatureInstances(mandateId)
+
+ if itemIdUsers:
+ stripe.SubscriptionItem.modify(
+ itemIdUsers, quantity=max(activeUsers, 0), proration_behavior="create_prorations",
+ )
+ if itemIdInstances:
+ stripe.SubscriptionItem.modify(
+ itemIdInstances, quantity=max(activeInstances, 0), proration_behavior="create_prorations",
+ )
+
+ logger.info("Stripe quantity synced for sub %s: users=%d, instances=%d", subscriptionId, activeUsers, activeInstances)
+ except Exception as e:
+ logger.error("syncQuantityToStripe(%s) failed: %s", subscriptionId, e)
diff --git a/modules/routes/routeAdminFeatures.py b/modules/routes/routeAdminFeatures.py
index 1bada7fa..b31b5e4c 100644
--- a/modules/routes/routeAdminFeatures.py
+++ b/modules/routes/routeAdminFeatures.py
@@ -518,15 +518,40 @@ def create_feature_instance(
detail=f"Feature '{data.featureCode}' not found"
)
+ # Subscription capacity check
+ mandateIdStr = str(context.mandateId)
+ try:
+ from modules.interfaces.interfaceDbSubscription import getInterface as _getSubIf
+ from modules.security.rootAccess import getRootUser
+ _subIf = _getSubIf(getRootUser(), mandateIdStr)
+ _subIf.assertCapacity(mandateIdStr, "featureInstances", delta=1)
+ except HTTPException:
+ raise
+ except Exception as capErr:
+ if "SubscriptionCapacityException" in type(capErr).__name__:
+ raise HTTPException(
+ status_code=status.HTTP_403_FORBIDDEN,
+ detail=str(capErr),
+ )
+
instance = featureInterface.createFeatureInstance(
featureCode=data.featureCode,
- mandateId=str(context.mandateId),
+ mandateId=mandateIdStr,
label=data.label,
enabled=data.enabled,
copyTemplateRoles=data.copyTemplateRoles,
config=data.config
)
-
+
+ # Sync Stripe quantity after successful creation
+ try:
+ from modules.interfaces.interfaceDbSubscription import getInterface as _getSubIf2
+ from modules.security.rootAccess import getRootUser as _getRU
+ _subIf2 = _getSubIf2(_getRU(), mandateIdStr)
+ _subIf2.syncQuantityToStripe(mandateIdStr)
+ except Exception:
+ pass
+
logger.info(
f"User {context.user.id} created feature instance '{data.label}' "
f"for feature '{data.featureCode}' in mandate {context.mandateId}"
diff --git a/modules/routes/routeBilling.py b/modules/routes/routeBilling.py
index eeee79a7..e1c23b48 100644
--- a/modules/routes/routeBilling.py
+++ b/modules/routes/routeBilling.py
@@ -226,9 +226,9 @@ def _filterTransactionsByScope(transactions: list, scope: BillingDataScope) -> l
# =============================================================================
class CreditAddRequest(BaseModel):
- """Request model for adding credit to an account."""
+ """Request model for adding or deducting credit from an account."""
userId: Optional[str] = Field(None, description="Target user ID (for PREPAY_USER model)")
- amount: float = Field(..., gt=0, description="Amount to credit in CHF")
+ amount: float = Field(..., description="Amount in CHF. Positive = credit, negative = deduction. Must not be zero.")
description: str = Field(default="Manual credit", description="Transaction description")
@@ -358,19 +358,8 @@ class UserTransactionResponse(BaseModel):
def _getStripeClient():
"""Initialize and return configured Stripe SDK module."""
- import stripe
- from modules.shared.configuration import APP_CONFIG
-
- api_version = APP_CONFIG.get("STRIPE_API_VERSION")
- if api_version:
- stripe.api_version = api_version
-
- secret_key = APP_CONFIG.get("STRIPE_SECRET_KEY_SECRET") or APP_CONFIG.get("STRIPE_SECRET_KEY")
- if not secret_key:
- raise ValueError("STRIPE_SECRET_KEY_SECRET not configured")
-
- stripe.api_key = secret_key
- return stripe
+ from modules.shared.stripeClient import getStripeClient
+ return getStripeClient()
def _creditStripeSessionIfNeeded(
@@ -835,20 +824,27 @@ def addCredit(
else:
raise HTTPException(status_code=400, detail=f"Cannot add credit to {billingModel.value} billing model")
- # Create credit transaction
+ if creditRequest.amount == 0:
+ raise HTTPException(status_code=400, detail="Amount must not be zero")
+
from modules.datamodels.datamodelBilling import BillingTransaction
-
+
+ isDeduction = creditRequest.amount < 0
+ txType = TransactionTypeEnum.DEBIT if isDeduction else TransactionTypeEnum.CREDIT
+ absAmount = abs(creditRequest.amount)
+
transaction = BillingTransaction(
accountId=account["id"],
- transactionType=TransactionTypeEnum.CREDIT,
- amount=creditRequest.amount,
+ transactionType=txType,
+ amount=absAmount,
description=creditRequest.description,
referenceType=ReferenceTypeEnum.ADMIN
)
result = billingInterface.createTransaction(transaction)
- logger.info(f"Added {creditRequest.amount} CHF credit to account {account['id']} in mandate {targetMandateId}")
+ action = "Deducted" if isDeduction else "Added"
+ logger.info(f"{action} {absAmount} CHF to account {account['id']} in mandate {targetMandateId}")
return result
@@ -1006,13 +1002,33 @@ async def stripeWebhook(
logger.info(f"Stripe webhook received: event={event.id}, type={event.type}")
- accepted_event_types = {"checkout.session.completed", "checkout.session.async_payment_succeeded"}
- if event.type not in accepted_event_types:
+ # Subscription-related events
+ subscriptionEventTypes = {
+ "customer.subscription.updated",
+ "customer.subscription.deleted",
+ "invoice.paid",
+ "invoice.payment_failed",
+ "customer.subscription.trial_will_end",
+ }
+
+ # Checkout events (existing)
+ checkoutEventTypes = {"checkout.session.completed", "checkout.session.async_payment_succeeded"}
+
+ if event.type in subscriptionEventTypes:
+ _handleSubscriptionWebhook(event)
return {"received": True}
-
+
+ if event.type not in checkoutEventTypes:
+ return {"received": True}
+
session = event.data.object
event_id = event.id
+ sessionMode = session.get("mode") if hasattr(session, "get") else getattr(session, "mode", None)
+ if sessionMode == "subscription":
+ _handleSubscriptionCheckoutCompleted(session, event_id)
+ return {"received": True}
+
billingInterface = _getRootInterface()
if billingInterface.getStripeWebhookEventByEventId(event_id):
logger.info(f"Stripe event {event_id} already processed, skipping")
@@ -1027,6 +1043,257 @@ async def stripeWebhook(
return {"received": True}
+def _handleSubscriptionCheckoutCompleted(session, eventId: str) -> None:
+ """Handle checkout.session.completed for mode=subscription.
+ Resolves the local PENDING record by ID from webhook metadata and transitions it."""
+ from modules.interfaces.interfaceDbSubscription import _getRootInterface as getSubRootInterface
+ from modules.datamodels.datamodelSubscription import SubscriptionStatusEnum, _getPlan
+ from modules.serviceCenter.services.serviceSubscription.mainServiceSubscription import (
+ getService as getSubscriptionService,
+ _notifySubscriptionChange,
+ )
+ from modules.security.rootAccess import getRootUser
+ from datetime import datetime, timezone
+
+ metadata = {}
+ if hasattr(session, "get"):
+ metadata = session.get("metadata") or {}
+ subscriptionRecordId = metadata.get("subscriptionRecordId")
+ mandateId = metadata.get("mandateId")
+ planKey = metadata.get("planKey", "")
+
+ platformUrl = metadata.get("platformUrl", "")
+
+ if not subscriptionRecordId:
+ stripeSub = session.get("subscription")
+ if stripeSub:
+ try:
+ from modules.shared.stripeClient import getStripeClient
+ stripe = getStripeClient()
+ subObj = stripe.Subscription.retrieve(stripeSub)
+ metadata = subObj.get("metadata") or {}
+ subscriptionRecordId = metadata.get("subscriptionRecordId")
+ mandateId = metadata.get("mandateId")
+ planKey = metadata.get("planKey", "")
+ platformUrl = platformUrl or metadata.get("platformUrl", "")
+ except Exception:
+ pass
+
+ stripeSubId = session.get("subscription")
+
+ if not mandateId or not subscriptionRecordId:
+ logger.warning("Subscription checkout missing metadata: %s", metadata)
+ return
+
+ subInterface = getSubRootInterface()
+ rootUser = getRootUser()
+
+ sub = subInterface.getById(subscriptionRecordId)
+ if not sub:
+ logger.error("Subscription record %s not found for checkout webhook", subscriptionRecordId)
+ return
+ if sub.get("status") != SubscriptionStatusEnum.PENDING.value:
+ logger.warning("Subscription %s is %s, expected PENDING — skipping", subscriptionRecordId, sub.get("status"))
+ return
+
+ stripeData: Dict[str, Any] = {}
+ if stripeSubId:
+ stripeData["stripeSubscriptionId"] = stripeSubId
+ try:
+ from modules.shared.stripeClient import getStripeClient
+ stripe = getStripeClient()
+ stripeSub = stripe.Subscription.retrieve(stripeSubId, expand=["items"])
+
+ if stripeSub.get("current_period_start"):
+ stripeData["currentPeriodStart"] = datetime.fromtimestamp(
+ stripeSub["current_period_start"], tz=timezone.utc
+ ).isoformat()
+ if stripeSub.get("current_period_end"):
+ stripeData["currentPeriodEnd"] = datetime.fromtimestamp(
+ stripeSub["current_period_end"], tz=timezone.utc
+ ).isoformat()
+
+ from modules.serviceCenter.services.serviceSubscription.stripeBootstrap import getStripePricesForPlan
+ priceMapping = getStripePricesForPlan(planKey)
+ for item in stripeSub.get("items", {}).get("data", []):
+ priceId = item.get("price", {}).get("id", "")
+ if priceMapping and priceId == priceMapping.stripePriceIdUsers:
+ stripeData["stripeItemIdUsers"] = item["id"]
+ elif priceMapping and priceId == priceMapping.stripePriceIdInstances:
+ stripeData["stripeItemIdInstances"] = item["id"]
+ except Exception as e:
+ logger.error("Error retrieving Stripe subscription %s: %s", stripeSubId, e)
+
+ if stripeData:
+ subInterface.updateFields(subscriptionRecordId, stripeData)
+
+ operative = subInterface.getOperativeForMandate(mandateId)
+ hasActivePredecessor = operative is not None and operative["id"] != subscriptionRecordId
+
+ if hasActivePredecessor:
+ toStatus = SubscriptionStatusEnum.SCHEDULED
+ if operative.get("recurring", True):
+ operativeStripeId = operative.get("stripeSubscriptionId")
+ if operativeStripeId:
+ try:
+ from modules.shared.stripeClient import getStripeClient
+ stripe = getStripeClient()
+ stripe.Subscription.modify(operativeStripeId, cancel_at_period_end=True)
+ except Exception as e:
+ logger.error("Failed to set cancel_at_period_end on predecessor %s: %s", operativeStripeId, e)
+ subInterface.updateFields(operative["id"], {"recurring": False})
+ effectiveFrom = operative.get("currentPeriodEnd")
+ if effectiveFrom:
+ subInterface.updateFields(subscriptionRecordId, {"effectiveFrom": effectiveFrom})
+ else:
+ toStatus = SubscriptionStatusEnum.ACTIVE
+
+ try:
+ subInterface.transitionStatus(
+ subscriptionRecordId, SubscriptionStatusEnum.PENDING, toStatus,
+ {"recurring": True},
+ )
+ except Exception as e:
+ logger.error("Failed to transition subscription %s: %s", subscriptionRecordId, e)
+ return
+
+ subService = getSubscriptionService(rootUser, mandateId)
+ subService.invalidateCache(mandateId)
+
+ if toStatus == SubscriptionStatusEnum.ACTIVE:
+ plan = _getPlan(planKey)
+ updatedSub = subInterface.getById(subscriptionRecordId)
+ _notifySubscriptionChange(mandateId, "activated", plan, subscriptionRecord=updatedSub, platformUrl=platformUrl)
+
+ logger.info(
+ "Checkout completed: sub=%s -> %s, mandate=%s, plan=%s",
+ subscriptionRecordId, toStatus.value, mandateId, planKey,
+ )
+
+
+def _handleSubscriptionWebhook(event) -> None:
+ """Process Stripe subscription webhook events.
+ All record resolution is by stripeSubscriptionId — no mandate-based guessing."""
+ from modules.interfaces.interfaceDbSubscription import _getRootInterface as getSubRootInterface
+ from modules.datamodels.datamodelSubscription import SubscriptionStatusEnum, _getPlan
+ from modules.serviceCenter.services.serviceSubscription.mainServiceSubscription import (
+ getService as getSubscriptionService,
+ _notifySubscriptionChange,
+ )
+ from modules.security.rootAccess import getRootUser
+ from datetime import datetime, timezone
+
+ obj = event.data.object
+ stripeSubId = obj.get("id") if event.type.startswith("customer.subscription") else obj.get("subscription")
+ if not stripeSubId:
+ logger.warning("Subscription webhook %s has no subscription ID", event.type)
+ return
+
+ subInterface = getSubRootInterface()
+ sub = subInterface.getByStripeSubscriptionId(stripeSubId)
+ if not sub:
+ logger.warning("No local record for Stripe subscription %s (event: %s)", stripeSubId, event.type)
+ return
+
+ subId = sub["id"]
+ mandateId = sub["mandateId"]
+ currentStatus = SubscriptionStatusEnum(sub["status"])
+ rootUser = getRootUser()
+ subService = getSubscriptionService(rootUser, mandateId)
+
+ subMetadata = obj.get("metadata") or {}
+ webhookPlatformUrl = subMetadata.get("platformUrl", "")
+
+ if event.type == "customer.subscription.updated":
+ stripeStatus = obj.get("status", "")
+
+ periodData: Dict[str, Any] = {}
+ if obj.get("current_period_start"):
+ periodData["currentPeriodStart"] = datetime.fromtimestamp(
+ obj["current_period_start"], tz=timezone.utc
+ ).isoformat()
+ if obj.get("current_period_end"):
+ periodData["currentPeriodEnd"] = datetime.fromtimestamp(
+ obj["current_period_end"], tz=timezone.utc
+ ).isoformat()
+ if periodData:
+ subInterface.updateFields(subId, periodData)
+
+ if stripeStatus == "active" and currentStatus == SubscriptionStatusEnum.SCHEDULED:
+ subInterface.transitionStatus(subId, SubscriptionStatusEnum.SCHEDULED, SubscriptionStatusEnum.ACTIVE)
+ subService.invalidateCache(mandateId)
+ plan = _getPlan(sub.get("planKey", ""))
+ refreshedSub = subInterface.getById(subId)
+ _notifySubscriptionChange(mandateId, "activated", plan, subscriptionRecord=refreshedSub, platformUrl=webhookPlatformUrl)
+ logger.info("SCHEDULED -> ACTIVE for sub %s (mandate %s)", subId, mandateId)
+
+ elif stripeStatus == "active" and currentStatus == SubscriptionStatusEnum.PAST_DUE:
+ subInterface.transitionStatus(subId, SubscriptionStatusEnum.PAST_DUE, SubscriptionStatusEnum.ACTIVE)
+ subService.invalidateCache(mandateId)
+ logger.info("PAST_DUE -> ACTIVE for sub %s (mandate %s)", subId, mandateId)
+
+ elif stripeStatus == "past_due" and currentStatus == SubscriptionStatusEnum.ACTIVE:
+ subInterface.transitionStatus(subId, SubscriptionStatusEnum.ACTIVE, SubscriptionStatusEnum.PAST_DUE)
+ subService.invalidateCache(mandateId)
+ logger.info("ACTIVE -> PAST_DUE for sub %s (mandate %s)", subId, mandateId)
+
+ elif stripeStatus == "active" and currentStatus == SubscriptionStatusEnum.ACTIVE:
+ subService.invalidateCache(mandateId)
+ logger.info("Period renewed for sub %s (mandate %s)", subId, mandateId)
+
+ elif event.type == "customer.subscription.deleted":
+ if currentStatus not in (SubscriptionStatusEnum.ACTIVE, SubscriptionStatusEnum.PAST_DUE,
+ SubscriptionStatusEnum.SCHEDULED):
+ logger.info("Ignoring deletion for sub %s in status %s", subId, currentStatus.value)
+ return
+
+ subInterface.transitionStatus(subId, currentStatus, SubscriptionStatusEnum.EXPIRED)
+ subService.invalidateCache(mandateId)
+ logger.info("Sub %s -> EXPIRED (Stripe deleted, mandate %s)", subId, mandateId)
+
+ scheduled = subInterface.getScheduledForMandate(mandateId)
+ if scheduled:
+ try:
+ subInterface.transitionStatus(
+ scheduled["id"], SubscriptionStatusEnum.SCHEDULED, SubscriptionStatusEnum.ACTIVE,
+ )
+ subService.invalidateCache(mandateId)
+ plan = _getPlan(scheduled.get("planKey", ""))
+ refreshedScheduled = subInterface.getById(scheduled["id"])
+ _notifySubscriptionChange(mandateId, "activated", plan, subscriptionRecord=refreshedScheduled, platformUrl=webhookPlatformUrl)
+ logger.info("Promoted SCHEDULED sub %s -> ACTIVE (mandate %s)", scheduled["id"], mandateId)
+ except Exception as e:
+ logger.error("Failed to promote SCHEDULED sub %s: %s", scheduled["id"], e)
+
+ elif event.type == "invoice.payment_failed":
+ if currentStatus == SubscriptionStatusEnum.ACTIVE:
+ subInterface.transitionStatus(subId, SubscriptionStatusEnum.ACTIVE, SubscriptionStatusEnum.PAST_DUE)
+ subService.invalidateCache(mandateId)
+ plan = _getPlan(sub.get("planKey", ""))
+ _notifySubscriptionChange(mandateId, "payment_failed", plan, subscriptionRecord=sub, platformUrl=webhookPlatformUrl)
+ logger.info("Payment failed for sub %s (mandate %s)", subId, mandateId)
+
+ elif event.type == "customer.subscription.trial_will_end":
+ logger.info("Trial ending soon for sub %s (mandate %s)", subId, mandateId)
+ try:
+ from modules.shared.notifyMandateAdmins import notifyMandateAdmins
+ notifyMandateAdmins(
+ mandateId,
+ "[PowerOn] Testphase endet bald",
+ "Testphase endet bald",
+ [
+ "Die kostenlose Testphase für Ihren Mandanten endet in Kürze.",
+ "Bitte wählen Sie einen Plan unter Billing-Verwaltung › Abonnement.",
+ ],
+ )
+ except Exception as e:
+ logger.error("Failed to notify about trial ending: %s", e)
+
+ elif event.type == "invoice.paid":
+ logger.info("Invoice paid for sub %s (mandate %s)", subId, mandateId)
+ return None
+
+
@router.get("/admin/accounts/{targetMandateId}", response_model=List[AccountSummary])
@limiter.limit("30/minute")
def getAccounts(
diff --git a/modules/routes/routeSubscription.py b/modules/routes/routeSubscription.py
new file mode 100644
index 00000000..28ad7aa9
--- /dev/null
+++ b/modules/routes/routeSubscription.py
@@ -0,0 +1,364 @@
+# Copyright (c) 2025 Patrick Motsch
+# All rights reserved.
+"""
+Subscription routes — ID-based, state-machine-driven.
+
+Endpoints:
+- GET /api/subscription/plans — list selectable plans
+- GET /api/subscription/status — operative + scheduled subscription for current mandate
+- POST /api/subscription/activate — start checkout for a plan
+- POST /api/subscription/cancel — cancel a specific subscription (by ID)
+- POST /api/subscription/reactivate — reactivate a cancelled subscription (by ID)
+- POST /api/subscription/force-cancel — sysadmin immediate cancel (by ID)
+"""
+
+from fastapi import APIRouter, HTTPException, Depends, Request
+from fastapi import status
+from typing import Dict, Any, List, Optional
+import logging
+from pydantic import BaseModel, Field
+
+from modules.auth import limiter, getRequestContext, RequestContext
+
+logger = logging.getLogger(__name__)
+
+
+def _resolveMandateId(context: RequestContext) -> str:
+ if context.mandateId:
+ return str(context.mandateId)
+ return ""
+
+
+def _assertMandateAdmin(context: RequestContext, mandateId: str) -> None:
+ if context.hasSysAdminRole:
+ return
+ try:
+ from modules.interfaces.interfaceDbApp import getRootInterface
+ rootInterface = getRootInterface()
+ userMandates = rootInterface.getUserMandates(str(context.user.id))
+ for um in userMandates:
+ if str(getattr(um, "mandateId", None)) != str(mandateId):
+ continue
+ if not getattr(um, "enabled", True):
+ continue
+ umId = str(getattr(um, "id", ""))
+ roleIds = rootInterface.getRoleIdsForUserMandate(umId)
+ for roleId in roleIds:
+ role = rootInterface.getRole(roleId)
+ if role and role.roleLabel == "admin" and not role.featureInstanceId:
+ return
+ except Exception:
+ pass
+ raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Mandate admin role required")
+
+
+# =============================================================================
+# Request / Response models
+# =============================================================================
+
+class ActivatePlanRequest(BaseModel):
+ planKey: str = Field(..., description="Key of the plan to activate")
+ returnUrl: str = Field(..., description="Frontend URL to redirect back to after Stripe Checkout")
+
+class CancelRequest(BaseModel):
+ subscriptionId: str = Field(..., description="ID of the subscription to cancel")
+
+class ReactivateRequest(BaseModel):
+ subscriptionId: str = Field(..., description="ID of the subscription to reactivate")
+
+class ForceCancelRequest(BaseModel):
+ subscriptionId: str = Field(..., description="ID of the subscription to force-cancel")
+
+class VerifyCheckoutRequest(BaseModel):
+ sessionId: str = Field(..., description="Stripe Checkout Session ID to verify")
+
+class SubscriptionStatusResponse(BaseModel):
+ active: bool
+ subscription: Optional[Dict[str, Any]] = None
+ plan: Optional[Dict[str, Any]] = None
+ scheduled: Optional[Dict[str, Any]] = None
+
+
+# =============================================================================
+# Router
+# =============================================================================
+
+router = APIRouter(
+ prefix="/api/subscription",
+ tags=["Subscription"],
+ responses={404: {"description": "Not found"}},
+)
+
+
+# =============================================================================
+# Endpoints
+# =============================================================================
+
+@router.get("/plans", response_model=List[Dict[str, Any]])
+@limiter.limit("30/minute")
+def getPlans(request: Request, context: RequestContext = Depends(getRequestContext)):
+ from modules.serviceCenter.services.serviceSubscription.mainServiceSubscription import (
+ getService as getSubscriptionService,
+ )
+ try:
+ mandateId = _resolveMandateId(context)
+ subService = getSubscriptionService(context.user, mandateId)
+ plans = subService.getSelectablePlans()
+ return [p.model_dump() for p in plans]
+ except Exception as e:
+ logger.error("Error fetching plans: %s", e)
+ raise HTTPException(status_code=500, detail=str(e))
+
+
+@router.get("/status", response_model=SubscriptionStatusResponse)
+@limiter.limit("60/minute")
+def getStatus(request: Request, context: RequestContext = Depends(getRequestContext)):
+ """Return the operative subscription and any scheduled successor for the current mandate."""
+ from modules.serviceCenter.services.serviceSubscription.mainServiceSubscription import (
+ getService as getSubscriptionService,
+ )
+ mandateId = _resolveMandateId(context)
+ if not mandateId:
+ return SubscriptionStatusResponse(active=False)
+ _assertMandateAdmin(context, mandateId)
+
+ try:
+ subService = getSubscriptionService(context.user, mandateId)
+ operative = subService.getOperativeSubscription(mandateId)
+ scheduled = subService.getScheduledSubscription(mandateId)
+
+ if not operative:
+ from modules.datamodels.datamodelSubscription import SubscriptionStatusEnum
+ pending = subService.listSubscriptions(mandateId, [SubscriptionStatusEnum.PENDING])
+ if pending:
+ sub = pending[0]
+ plan = subService.getPlan(sub.get("planKey", ""))
+ return SubscriptionStatusResponse(
+ active=False,
+ subscription=sub,
+ plan=plan.model_dump() if plan else None,
+ scheduled=scheduled,
+ )
+ return SubscriptionStatusResponse(active=False, scheduled=scheduled)
+
+ plan = subService.getPlan(operative.get("planKey", ""))
+ return SubscriptionStatusResponse(
+ active=True,
+ subscription=operative,
+ plan=plan.model_dump() if plan else None,
+ scheduled=scheduled,
+ )
+ except Exception as e:
+ logger.error("Error fetching status: %s", e)
+ raise HTTPException(status_code=500, detail=str(e))
+
+
+@router.post("/activate", response_model=Dict[str, Any])
+@limiter.limit("10/minute")
+def activatePlan(
+ request: Request,
+ data: ActivatePlanRequest,
+ context: RequestContext = Depends(getRequestContext),
+):
+ from modules.serviceCenter.services.serviceSubscription.mainServiceSubscription import (
+ getService as getSubscriptionService,
+ )
+ mandateId = _resolveMandateId(context)
+ if not mandateId:
+ raise HTTPException(status_code=400, detail="X-Mandate-Id header required")
+ _assertMandateAdmin(context, mandateId)
+
+ try:
+ subService = getSubscriptionService(context.user, mandateId)
+ return subService.activatePlan(mandateId, data.planKey, returnUrl=data.returnUrl)
+ except ValueError as e:
+ raise HTTPException(status_code=400, detail=str(e))
+ except Exception as e:
+ logger.error("Error activating plan %s: %s", data.planKey, e)
+ raise HTTPException(status_code=500, detail=str(e))
+
+
+@router.post("/cancel", response_model=Dict[str, Any])
+@limiter.limit("5/minute")
+def cancelSubscription(
+ request: Request,
+ data: CancelRequest,
+ context: RequestContext = Depends(getRequestContext),
+):
+ """Cancel a specific subscription by its ID."""
+ from modules.serviceCenter.services.serviceSubscription.mainServiceSubscription import (
+ getService as getSubscriptionService,
+ )
+ mandateId = _resolveMandateId(context)
+ if not mandateId:
+ raise HTTPException(status_code=400, detail="X-Mandate-Id header required")
+ _assertMandateAdmin(context, mandateId)
+
+ try:
+ subService = getSubscriptionService(context.user, mandateId)
+ return subService.cancelSubscription(data.subscriptionId)
+ except ValueError as e:
+ raise HTTPException(status_code=400, detail=str(e))
+ except Exception as e:
+ logger.error("Error cancelling subscription %s: %s", data.subscriptionId, e)
+ raise HTTPException(status_code=500, detail=str(e))
+
+
+@router.post("/reactivate", response_model=Dict[str, Any])
+@limiter.limit("5/minute")
+def reactivateSubscription(
+ request: Request,
+ data: ReactivateRequest,
+ context: RequestContext = Depends(getRequestContext),
+):
+ """Reactivate a cancelled (non-recurring) subscription before its period ends."""
+ from modules.serviceCenter.services.serviceSubscription.mainServiceSubscription import (
+ getService as getSubscriptionService,
+ )
+ mandateId = _resolveMandateId(context)
+ if not mandateId:
+ raise HTTPException(status_code=400, detail="X-Mandate-Id header required")
+ _assertMandateAdmin(context, mandateId)
+
+ try:
+ subService = getSubscriptionService(context.user, mandateId)
+ return subService.reactivateSubscription(data.subscriptionId)
+ except ValueError as e:
+ raise HTTPException(status_code=400, detail=str(e))
+ except Exception as e:
+ logger.error("Error reactivating subscription %s: %s", data.subscriptionId, e)
+ raise HTTPException(status_code=500, detail=str(e))
+
+
+@router.post("/force-cancel", response_model=Dict[str, Any])
+@limiter.limit("5/minute")
+def forceCancel(
+ request: Request,
+ data: ForceCancelRequest,
+ context: RequestContext = Depends(getRequestContext),
+):
+ """Sysadmin: immediately expire any non-terminal subscription."""
+ if not context.hasSysAdminRole:
+ raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Sysadmin role required")
+
+ from modules.serviceCenter.services.serviceSubscription.mainServiceSubscription import (
+ getService as getSubscriptionService,
+ )
+ from modules.interfaces.interfaceDbSubscription import _getRootInterface as getSubRootInterface
+ sub = getSubRootInterface().getById(data.subscriptionId)
+ if not sub:
+ raise HTTPException(status_code=404, detail="Subscription not found")
+ mandateId = sub["mandateId"]
+
+ try:
+ subService = getSubscriptionService(context.user, mandateId)
+ return subService.forceCancel(data.subscriptionId)
+ except ValueError as e:
+ raise HTTPException(status_code=400, detail=str(e))
+ except Exception as e:
+ logger.error("Error force-cancelling subscription %s: %s", data.subscriptionId, e)
+ raise HTTPException(status_code=500, detail=str(e))
+
+
+@router.post("/checkout/verify", response_model=Dict[str, Any])
+@limiter.limit("20/minute")
+def verifyCheckout(
+ request: Request,
+ data: VerifyCheckoutRequest,
+ context: RequestContext = Depends(getRequestContext),
+):
+ """Verify a Stripe Checkout Session and activate the subscription if paid.
+
+ This is the synchronous counterpart to the checkout.session.completed webhook.
+ It's called by the frontend immediately after returning from Stripe to handle
+ environments where webhooks may be delayed or unavailable (e.g. localhost dev).
+ The logic is idempotent — if the webhook already processed the session, this is a no-op.
+ """
+ mandateId = _resolveMandateId(context)
+ if not mandateId:
+ raise HTTPException(status_code=400, detail="X-Mandate-Id header required")
+ _assertMandateAdmin(context, mandateId)
+
+ try:
+ from modules.shared.stripeClient import getStripeClient
+ stripe = getStripeClient()
+ session = stripe.checkout.Session.retrieve(data.sessionId)
+ except Exception as e:
+ logger.error("Failed to retrieve checkout session %s: %s", data.sessionId, e)
+ raise HTTPException(status_code=400, detail="Invalid session ID")
+
+ if session.get("status") != "complete" or session.get("payment_status") != "paid":
+ return {"status": "pending", "message": "Checkout not yet completed"}
+
+ if session.get("mode") != "subscription":
+ raise HTTPException(status_code=400, detail="Not a subscription checkout session")
+
+ from modules.routes.routeBilling import _handleSubscriptionCheckoutCompleted
+ _handleSubscriptionCheckoutCompleted(session, f"verify-{data.sessionId}")
+
+ return {"status": "activated", "message": "Subscription activated"}
+
+
+# =============================================================================
+# SysAdmin: global subscription overview
+# =============================================================================
+
+@router.get("/admin/all", response_model=List[Dict[str, Any]])
+@limiter.limit("30/minute")
+def getAllSubscriptions(
+ request: Request,
+ context: RequestContext = Depends(getRequestContext),
+):
+ """SysAdmin: list ALL subscriptions across all mandates with enriched metadata."""
+ if not context.hasSysAdminRole:
+ raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Sysadmin role required")
+
+ from modules.interfaces.interfaceDbSubscription import _getRootInterface as getSubRootInterface
+ from modules.datamodels.datamodelSubscription import BUILTIN_PLANS, OPERATIVE_STATUSES
+
+ subInterface = getSubRootInterface()
+ allSubs = subInterface.listAll()
+
+ mandateNames: Dict[str, str] = {}
+ try:
+ from modules.datamodels.datamodelUam import Mandate
+ from modules.security.rootAccess import getRootDbAppConnector
+ appDb = getRootDbAppConnector()
+ for row in appDb.getRecordset(Mandate):
+ r = dict(row)
+ mid = r.get("id", "")
+ mandateNames[mid] = r.get("label") or r.get("name") or mid[:8]
+ except Exception as e:
+ logger.warning("Could not bulk-resolve mandate names: %s", e)
+
+ operativeValues = {s.value for s in OPERATIVE_STATUSES}
+
+ enriched = []
+ for sub in allSubs:
+ mid = sub.get("mandateId", "")
+ planKey = sub.get("planKey", "")
+ plan = BUILTIN_PLANS.get(planKey)
+
+ sub["mandateName"] = mandateNames.get(mid, mid[:8])
+ sub["planTitle"] = (plan.title.get("de") or plan.title.get("en") or planKey) if plan else planKey
+
+ if sub.get("status") in operativeValues:
+ userPrice = sub.get("snapshotPricePerUserCHF", 0) or 0
+ instPrice = sub.get("snapshotPricePerInstanceCHF", 0) or 0
+ try:
+ userCount = subInterface.countActiveUsers(mid)
+ instanceCount = subInterface.countActiveFeatureInstances(mid)
+ except Exception:
+ userCount = 0
+ instanceCount = 0
+ sub["monthlyRevenueCHF"] = round(userPrice * userCount + instPrice * instanceCount, 2)
+ sub["activeUsers"] = userCount
+ sub["activeInstances"] = instanceCount
+ else:
+ sub["monthlyRevenueCHF"] = 0
+ sub["activeUsers"] = 0
+ sub["activeInstances"] = 0
+
+ enriched.append(sub)
+
+ return enriched
diff --git a/modules/serviceCenter/registry.py b/modules/serviceCenter/registry.py
index 900f9f0e..be0accba 100644
--- a/modules/serviceCenter/registry.py
+++ b/modules/serviceCenter/registry.py
@@ -45,10 +45,17 @@ IMPORTABLE_SERVICES: Dict[str, Dict[str, Any]] = {
"billing": {
"module": "modules.serviceCenter.services.serviceBilling.mainServiceBilling",
"class": "BillingService",
- "dependencies": [],
+ "dependencies": ["subscription"],
"objectKey": "service.billing",
"label": {"en": "Billing", "de": "Abrechnung", "fr": "Facturation"},
},
+ "subscription": {
+ "module": "modules.serviceCenter.services.serviceSubscription.mainServiceSubscription",
+ "class": "SubscriptionService",
+ "dependencies": [],
+ "objectKey": "service.subscription",
+ "label": {"en": "Subscription", "de": "Abonnement", "fr": "Abonnement"},
+ },
"sharepoint": {
"module": "modules.serviceCenter.services.serviceSharepoint.mainServiceSharepoint",
"class": "SharepointService",
diff --git a/modules/serviceCenter/services/serviceAgent/agentLoop.py b/modules/serviceCenter/services/serviceAgent/agentLoop.py
index bee03424..c196d237 100644
--- a/modules/serviceCenter/services/serviceAgent/agentLoop.py
+++ b/modules/serviceCenter/services/serviceAgent/agentLoop.py
@@ -25,6 +25,9 @@ from modules.shared.jsonUtils import closeJsonStructures
from modules.serviceCenter.services.serviceBilling.mainServiceBilling import (
InsufficientBalanceException,
)
+from modules.serviceCenter.services.serviceSubscription.mainServiceSubscription import (
+ SubscriptionInactiveException,
+)
logger = logging.getLogger(__name__)
@@ -191,6 +194,18 @@ async def runAgentLoop(
else:
aiResponse = await aiCallFn(aiRequest)
+ except SubscriptionInactiveException as e:
+ logger.warning(
+ f"Subscription inactive in round {state.currentRound} (mandate={mandateId}): {e.message}"
+ )
+ state.status = AgentStatusEnum.ERROR
+ state.abortReason = e.message
+ yield AgentEvent(
+ type=AgentEventTypeEnum.ERROR,
+ content=e.message,
+ data=e.toClientDict(),
+ )
+ break
except InsufficientBalanceException as e:
logger.warning(
f"Insufficient balance in round {state.currentRound} (mandate={mandateId}): {e.message}"
diff --git a/modules/serviceCenter/services/serviceAi/mainServiceAi.py b/modules/serviceCenter/services/serviceAi/mainServiceAi.py
index 90fd4d9a..09e2d708 100644
--- a/modules/serviceCenter/services/serviceAi/mainServiceAi.py
+++ b/modules/serviceCenter/services/serviceAi/mainServiceAi.py
@@ -27,6 +27,10 @@ from modules.serviceCenter.services.serviceBilling.mainServiceBilling import (
ProviderNotAllowedException,
BillingContextError
)
+from modules.serviceCenter.services.serviceSubscription.mainServiceSubscription import (
+ SubscriptionInactiveException,
+ SUBSCRIPTION_REASONS,
+)
logger = logging.getLogger(__name__)
@@ -590,11 +594,25 @@ detectedIntent-Werte:
balanceCheck = billingService.checkBalance(estimatedCost)
if not balanceCheck.allowed:
+ reason = balanceCheck.reason or ""
+
+ if reason in SUBSCRIPTION_REASONS:
+ from modules.datamodels.datamodelSubscription import SubscriptionStatusEnum
+ statusMap = {
+ "SUBSCRIPTION_PAYMENT_REQUIRED": SubscriptionStatusEnum.PAST_DUE,
+ "SUBSCRIPTION_EXPIRED": SubscriptionStatusEnum.EXPIRED,
+ "SUBSCRIPTION_INACTIVE": SubscriptionStatusEnum.EXPIRED,
+ }
+ raise SubscriptionInactiveException(
+ status=statusMap.get(reason, SubscriptionStatusEnum.EXPIRED),
+ mandateId=str(mandateId),
+ )
+
balance_str = f"{(balanceCheck.currentBalance or 0):.2f}"
logger.warning(
f"Billing check failed for user {user.id}: "
f"Balance {balance_str} CHF, "
- f"Reason: {balanceCheck.reason}"
+ f"Reason: {reason}"
)
if balanceCheck.billingModel == BillingModelEnum.PREPAY_MANDATE:
ulabel = (getattr(user, "email", None) or getattr(user, "username", None) or str(user.id))
@@ -651,6 +669,8 @@ detectedIntent-Werte:
logger.debug(f"Provider check passed: {len(rbacAllowedProviders)} providers allowed")
+ except SubscriptionInactiveException:
+ raise
except InsufficientBalanceException:
raise
except ProviderNotAllowedException:
@@ -658,7 +678,6 @@ detectedIntent-Werte:
except BillingContextError:
raise
except Exception as e:
- # FAIL-SAFE: Don't silently swallow errors - log at ERROR level
logger.error(f"BILLING FAIL-SAFE: Billing check failed with unexpected error: {e}")
raise BillingContextError(f"Billing check failed: {e}")
diff --git a/modules/serviceCenter/services/serviceBilling/billingExhaustedNotify.py b/modules/serviceCenter/services/serviceBilling/billingExhaustedNotify.py
index aba08b89..d8f2acc4 100644
--- a/modules/serviceCenter/services/serviceBilling/billingExhaustedNotify.py
+++ b/modules/serviceCenter/services/serviceBilling/billingExhaustedNotify.py
@@ -1,23 +1,17 @@
# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
-When the shared mandate pool (PREPAY_MANDATE) is exhausted, notify billing contacts.
+When the shared mandate pool (PREPAY_MANDATE) is exhausted, notify mandate admins.
-Recipients: BillingSettings.notifyEmails for the mandate (configure as mandate owner / finance).
+Uses the central notifyMandateAdmins() function for recipient resolution and delivery.
Emails are throttled per mandate to avoid spam (one notification per cooldown window).
"""
from __future__ import annotations
-import html
import logging
import time
-from typing import Any, Dict, List, Optional
-
-from modules.datamodels.datamodelMessaging import MessagingChannel
-from modules.interfaces.interfaceDbBilling import getInterface as getBillingInterface
-from modules.interfaces.interfaceMessaging import getInterface as getMessagingInterface
-from modules.security.rootAccess import getRootUser
+from typing import Dict
logger = logging.getLogger(__name__)
@@ -26,29 +20,6 @@ _poolExhaustedEmailLastSent: Dict[str, float] = {}
_DEFAULT_COOLDOWN_SEC = 3600
-def _normalizeNotifyEmails(raw: Any) -> List[str]:
- if raw is None:
- return []
- if isinstance(raw, list):
- return [str(e).strip() for e in raw if str(e).strip()]
- if isinstance(raw, str):
- s = raw.strip()
- if not s:
- return []
- # JSON array string
- if s.startswith("["):
- try:
- import json
-
- parsed = json.loads(s)
- if isinstance(parsed, list):
- return [str(e).strip() for e in parsed if str(e).strip()]
- except Exception:
- pass
- return [s]
- return []
-
-
def maybeEmailMandatePoolExhausted(
mandateId: str,
triggeringUserId: str,
@@ -58,7 +29,7 @@ def maybeEmailMandatePoolExhausted(
cooldownSec: float = _DEFAULT_COOLDOWN_SEC,
) -> None:
"""
- Send one email per mandate per cooldown to BillingSettings.notifyEmails.
+ Send one notification per mandate per cooldown window when the pool is exhausted.
Args:
mandateId: Mandate whose pool is empty.
@@ -82,59 +53,22 @@ def maybeEmailMandatePoolExhausted(
return
try:
- billing = getBillingInterface(getRootUser(), mandateId)
- settings = billing.getSettings(mandateId) or {}
- recipients = _normalizeNotifyEmails(settings.get("notifyEmails"))
- if not recipients:
- logger.warning(
- "PREPAY_MANDATE pool exhausted for mandate %s but notifyEmails is empty — "
- "configure BillingSettings.notifyEmails for owner alerts",
- mandateId,
- )
- return
+ from modules.shared.notifyMandateAdmins import notifyMandateAdmins
- subject = f"[PowerOn] Mandanten-Budget aufgebraucht (Mandant {mandateId[:8]}…)"
- body = (
- f"Das gemeinsame Guthaben (PREPAY_MANDATE) für diesen Mandanten ist nicht mehr ausreichend.\n\n"
- f"Mandanten-ID: {mandateId}\n"
- f"Aktuelles Guthaben (Pool): CHF {currentBalance:.2f}\n"
- f"Benötigt (mind.): CHF {requiredAmount:.2f}\n\n"
- f"Auslösende/r Benutzer/in: {triggeringUserLabel} (ID: {triggeringUserId})\n\n"
- f"Bitte laden Sie das Mandats-Guthaben in der Billing-Verwaltung auf, "
- f"damit Benutzer wieder AI-Funktionen nutzen können.\n"
+ sent = notifyMandateAdmins(
+ mandateId,
+ "[PowerOn] Mandanten-Budget aufgebraucht",
+ "Budget aufgebraucht",
+ [
+ "Das gemeinsame Guthaben (Prepaid-Pool) für diesen Mandanten ist nicht mehr ausreichend.",
+ f"Aktuelles Guthaben: CHF {currentBalance:.2f}\n"
+ f"Benötigt (mindestens): CHF {requiredAmount:.2f}",
+ f"Ausgelöst durch: {triggeringUserLabel}",
+ "Bitte laden Sie das Mandats-Guthaben in der Billing-Verwaltung auf, "
+ "damit Benutzer wieder AI-Funktionen nutzen können.",
+ ],
)
- escaped = html.escape(body)
- # Cannot use '\\n' inside f-string {…} expression (SyntaxError); build replacement outside.
- brWithNl = "
" + "\n"
- htmlMessage = f"""
-
-
-{escaped.replace(chr(10), brWithNl)}
-"""
-
- messaging = getMessagingInterface()
- any_ok = False
- for to in recipients:
- try:
- ok = messaging.send(
- channel=MessagingChannel.EMAIL,
- recipient=to,
- subject=subject,
- message=htmlMessage,
- )
- if ok:
- any_ok = True
- else:
- logger.warning("Pool exhausted email failed for %s", to)
- except Exception as send_err:
- logger.error("Error sending pool exhausted email to %s: %s", to, send_err)
-
- if any_ok:
+ if sent > 0:
_poolExhaustedEmailLastSent[mandateId] = now
- logger.info(
- "Sent mandate pool exhausted notification for mandate %s to %s recipient(s)",
- mandateId,
- len(recipients),
- )
except Exception as e:
logger.error("maybeEmailMandatePoolExhausted failed: %s", e, exc_info=True)
diff --git a/modules/serviceCenter/services/serviceBilling/mainServiceBilling.py b/modules/serviceCenter/services/serviceBilling/mainServiceBilling.py
index d0325a5e..969ec6b8 100644
--- a/modules/serviceCenter/services/serviceBilling/mainServiceBilling.py
+++ b/modules/serviceCenter/services/serviceBilling/mainServiceBilling.py
@@ -160,6 +160,10 @@ class BillingService:
def checkBalance(self, estimatedCost: float = 0.0) -> BillingCheckResult:
"""
Check if the current user/mandate has sufficient balance.
+
+ Gate order:
+ 1. Subscription active? (fast, cached) — blocks AI if not
+ 2. Budget sufficient? (existing prepaid logic)
Args:
estimatedCost: Estimated cost of the operation (with markup applied)
@@ -167,11 +171,42 @@ class BillingService:
Returns:
BillingCheckResult indicating if operation is allowed
"""
+ subResult = self._checkSubscription()
+ if subResult is not None:
+ return subResult
+
return self._billingInterface.checkBalance(
self.mandateId,
self.currentUser.id,
estimatedCost
)
+
+ def _checkSubscription(self) -> Optional[BillingCheckResult]:
+ """Return a failing BillingCheckResult if subscription is not active, else None."""
+ try:
+ from modules.datamodels.datamodelSubscription import SubscriptionStatusEnum
+ from modules.serviceCenter.services.serviceSubscription.mainServiceSubscription import (
+ getService as getSubscriptionService,
+ _subscriptionReasonForStatus,
+ _subscriptionUserActionForStatus,
+ )
+
+ subService = getSubscriptionService(self.currentUser, self.mandateId)
+ status = subService.assertActive(self.mandateId)
+
+ if status in (SubscriptionStatusEnum.ACTIVE, SubscriptionStatusEnum.TRIALING, SubscriptionStatusEnum.PAST_DUE):
+ return None
+
+ return BillingCheckResult(
+ allowed=False,
+ reason=_subscriptionReasonForStatus(status),
+ upgradeRequired=True,
+ subscriptionUiPath="/admin/billing?tab=subscription",
+ userAction=_subscriptionUserActionForStatus(status),
+ )
+ except Exception as e:
+ logger.warning(f"Subscription check failed (allowing): {e}")
+ return None
def hasBalance(self, estimatedCost: float = 0.0) -> bool:
"""
diff --git a/modules/serviceCenter/services/serviceBilling/stripeCheckout.py b/modules/serviceCenter/services/serviceBilling/stripeCheckout.py
index 9f3f7e68..8d6b4a57 100644
--- a/modules/serviceCenter/services/serviceBilling/stripeCheckout.py
+++ b/modules/serviceCenter/services/serviceBilling/stripeCheckout.py
@@ -82,17 +82,8 @@ def create_checkout_session(
f"Invalid amount {amount_chf} CHF. Allowed: {ALLOWED_AMOUNTS_CHF}"
)
- # Pin API version from config (match Stripe Dashboard)
- api_version = APP_CONFIG.get("STRIPE_API_VERSION")
- if api_version:
- stripe.api_version = api_version
-
- # Get secrets
- secret_key = APP_CONFIG.get("STRIPE_SECRET_KEY_SECRET") or APP_CONFIG.get("STRIPE_SECRET_KEY")
- if not secret_key:
- raise ValueError("STRIPE_SECRET_KEY_SECRET not configured")
-
- stripe.api_key = secret_key
+ from modules.shared.stripeClient import getStripeClient
+ stripe = getStripeClient()
base_return_url = _normalizeReturnUrl(return_url)
query_separator = "&" if "?" in base_return_url else "?"
diff --git a/modules/serviceCenter/services/serviceSubscription/__init__.py b/modules/serviceCenter/services/serviceSubscription/__init__.py
new file mode 100644
index 00000000..e69de29b
diff --git a/modules/serviceCenter/services/serviceSubscription/mainServiceSubscription.py b/modules/serviceCenter/services/serviceSubscription/mainServiceSubscription.py
new file mode 100644
index 00000000..b9a1481c
--- /dev/null
+++ b/modules/serviceCenter/services/serviceSubscription/mainServiceSubscription.py
@@ -0,0 +1,710 @@
+# Copyright (c) 2025 Patrick Motsch
+# All rights reserved.
+"""
+Subscription Service — state-machine-based lifecycle management.
+
+Every mutation takes an explicit subscriptionId. No status-scan guessing.
+See wiki/concepts/Subscription-State-Machine.md for the full state machine.
+"""
+
+import logging
+import time
+from typing import Dict, Any, List, Optional
+from datetime import datetime, timezone, timedelta
+
+from modules.datamodels.datamodelUam import User
+from modules.datamodels.datamodelSubscription import (
+ SubscriptionPlan,
+ MandateSubscription,
+ SubscriptionStatusEnum,
+ BillingPeriodEnum,
+ OPERATIVE_STATUSES,
+ _getPlan,
+ _getSelectablePlans,
+)
+from modules.interfaces.interfaceDbSubscription import (
+ getInterface as getSubscriptionInterface,
+ InvalidTransitionError,
+)
+
+logger = logging.getLogger(__name__)
+
+SUBSCRIPTION_CACHE_TTL_SECONDS = 60
+_STALE_PENDING_SECONDS = 30 * 60
+
+_subscriptionServices: Dict[str, "SubscriptionService"] = {}
+_statusCache: Dict[str, tuple] = {}
+
+
+def getService(currentUser: User, mandateId: str) -> "SubscriptionService":
+ cacheKey = f"{currentUser.id}_{mandateId}"
+ if cacheKey not in _subscriptionServices:
+ _subscriptionServices[cacheKey] = SubscriptionService(currentUser, mandateId)
+ else:
+ _subscriptionServices[cacheKey].setContext(currentUser, mandateId)
+ return _subscriptionServices[cacheKey]
+
+
+class SubscriptionService:
+ """State-machine-based subscription service.
+ All mutations use explicit subscriptionId. No scan-based writes."""
+
+ def __init__(self, contextOrUser, mandateId=None, get_service=None):
+ if mandateId is not None and callable(mandateId):
+ ctx = contextOrUser
+ self.currentUser = ctx.user
+ self.mandateId = ctx.mandate_id or ""
+ elif get_service is not None and hasattr(contextOrUser, "user"):
+ ctx = contextOrUser
+ self.currentUser = ctx.user
+ self.mandateId = ctx.mandate_id or ""
+ else:
+ self.currentUser = contextOrUser
+ self.mandateId = mandateId or ""
+ self._interface = getSubscriptionInterface(self.currentUser, self.mandateId)
+
+ def setContext(self, currentUser: User, mandateId: str):
+ self.currentUser = currentUser
+ self.mandateId = mandateId
+ self._interface = getSubscriptionInterface(currentUser, mandateId)
+
+ # =========================================================================
+ # Billing gate (cached, read-only)
+ # =========================================================================
+
+ def assertActive(self, mandateId: str = None) -> SubscriptionStatusEnum:
+ """Return subscription status for billing decisions. Uses TTL cache.
+ This is the ONLY method that works by mandateId (read-only)."""
+ mid = mandateId or self.mandateId
+ now = time.monotonic()
+
+ cached = _statusCache.get(mid)
+ if cached and cached[1] > now:
+ return cached[0]
+
+ status = self._interface.assertActive(mid)
+ _statusCache[mid] = (status, now + SUBSCRIPTION_CACHE_TTL_SECONDS)
+ return status
+
+ def invalidateCache(self, mandateId: str = None):
+ mid = mandateId or self.mandateId
+ _statusCache.pop(mid, None)
+
+ # =========================================================================
+ # Capacity (delegation)
+ # =========================================================================
+
+ def assertCapacity(self, mandateId: str, resourceType: str, delta: int = 1) -> bool:
+ return self._interface.assertCapacity(mandateId or self.mandateId, resourceType, delta)
+
+ # =========================================================================
+ # Read operations
+ # =========================================================================
+
+ def getById(self, subscriptionId: str) -> Optional[Dict[str, Any]]:
+ return self._interface.getById(subscriptionId)
+
+ def getOperativeSubscription(self, mandateId: str = None) -> Optional[Dict[str, Any]]:
+ return self._interface.getOperativeForMandate(mandateId or self.mandateId)
+
+ def getScheduledSubscription(self, mandateId: str = None) -> Optional[Dict[str, Any]]:
+ return self._interface.getScheduledForMandate(mandateId or self.mandateId)
+
+ def listSubscriptions(self, mandateId: str = None, statusFilter=None) -> List[Dict[str, Any]]:
+ return self._interface.listForMandate(mandateId or self.mandateId, statusFilter)
+
+ def getSelectablePlans(self) -> List[SubscriptionPlan]:
+ return _getSelectablePlans()
+
+ def getPlan(self, planKey: str) -> Optional[SubscriptionPlan]:
+ return _getPlan(planKey)
+
+ # =========================================================================
+ # T1/T2: Plan activation (creates PENDING, returns checkout URL)
+ # =========================================================================
+
+ def activatePlan(self, mandateId: str, planKey: str, returnUrl: str) -> Dict[str, Any]:
+ """Create a new subscription as PENDING and start the checkout flow.
+
+ - Free/trial plans: immediately ACTIVE/TRIALING (no checkout).
+ - Paid plans with active predecessor: PENDING -> checkout -> SCHEDULED on confirmation.
+ - Paid plans without predecessor: PENDING -> checkout -> ACTIVE on confirmation.
+
+ Cleans up any existing PENDING/SCHEDULED for this mandate first (by ID)."""
+ mid = mandateId or self.mandateId
+ plan = _getPlan(planKey)
+ if not plan:
+ raise ValueError(f"Unknown plan: {planKey}")
+
+ isPaid = plan.billingPeriod != BillingPeriodEnum.NONE and not plan.trialDays
+ currentOperative = self._interface.getOperativeForMandate(mid)
+
+ self._cleanupPreparatorySubscriptions(mid)
+
+ now = datetime.now(timezone.utc)
+ if plan.trialDays:
+ initialStatus = SubscriptionStatusEnum.TRIALING
+ elif isPaid:
+ initialStatus = SubscriptionStatusEnum.PENDING
+ else:
+ initialStatus = SubscriptionStatusEnum.ACTIVE
+
+ sub = MandateSubscription(
+ mandateId=mid,
+ planKey=planKey,
+ status=initialStatus,
+ recurring=plan.autoRenew and not plan.trialDays,
+ startedAt=now,
+ currentPeriodStart=now,
+ snapshotPricePerUserCHF=plan.pricePerUserCHF,
+ snapshotPricePerInstanceCHF=plan.pricePerFeatureInstanceCHF,
+ )
+
+ if plan.trialDays:
+ sub.trialEndsAt = now + timedelta(days=plan.trialDays)
+
+ if plan.billingPeriod == BillingPeriodEnum.MONTHLY:
+ sub.currentPeriodEnd = now + timedelta(days=30)
+ elif plan.billingPeriod == BillingPeriodEnum.YEARLY:
+ sub.currentPeriodEnd = now + timedelta(days=365)
+
+ created = self._interface.createSubscription(sub)
+
+ from urllib.parse import urlparse
+ parsed = urlparse(returnUrl) if returnUrl else None
+ pUrl = f"{parsed.scheme}://{parsed.netloc}" if parsed and parsed.scheme else ""
+
+ if isPaid:
+ try:
+ checkoutUrl = self._createCheckoutSession(mid, plan, created, currentOperative, returnUrl)
+ created["redirectUrl"] = checkoutUrl
+ except Exception as e:
+ self._interface.forceExpire(created["id"])
+ self.invalidateCache(mid)
+ raise ValueError(f"Subscription konnte nicht erstellt werden: {e}") from e
+ else:
+ if currentOperative:
+ self._expireOperative(currentOperative["id"], mid)
+ _notifySubscriptionChange(mid, "activated", plan, subscriptionRecord=created, platformUrl=pUrl)
+
+ self.invalidateCache(mid)
+ return created
+
+ def _cleanupPreparatorySubscriptions(self, mandateId: str) -> None:
+ """Expire any existing PENDING or SCHEDULED subscriptions for this mandate (by ID)."""
+ preparatory = self._interface.listForMandate(
+ mandateId, [SubscriptionStatusEnum.PENDING, SubscriptionStatusEnum.SCHEDULED],
+ )
+ for sub in preparatory:
+ subId = sub["id"]
+ currentStatus = SubscriptionStatusEnum(sub["status"])
+ stripeSubId = sub.get("stripeSubscriptionId")
+
+ if stripeSubId and currentStatus == SubscriptionStatusEnum.SCHEDULED:
+ try:
+ from modules.shared.stripeClient import getStripeClient
+ stripe = getStripeClient()
+ stripe.Subscription.cancel(stripeSubId)
+ except Exception as e:
+ logger.error("Failed to cancel Stripe sub %s during cleanup: %s", stripeSubId, e)
+
+ self._interface.transitionStatus(subId, currentStatus, SubscriptionStatusEnum.EXPIRED)
+ logger.info("Cleaned up %s subscription %s for mandate %s", currentStatus.value, subId, mandateId)
+
+ def _expireOperative(self, subscriptionId: str, mandateId: str) -> None:
+ """Expire the current operative subscription (used when a free/trial plan replaces it)."""
+ sub = self._interface.getById(subscriptionId)
+ if not sub:
+ return
+ currentStatus = SubscriptionStatusEnum(sub["status"])
+ if currentStatus in OPERATIVE_STATUSES:
+ stripeSubId = sub.get("stripeSubscriptionId")
+ if stripeSubId:
+ try:
+ from modules.shared.stripeClient import getStripeClient
+ stripe = getStripeClient()
+ stripe.Subscription.cancel(stripeSubId)
+ except Exception as e:
+ logger.error("Failed to cancel Stripe sub %s: %s", stripeSubId, e)
+ self._interface.transitionStatus(subscriptionId, currentStatus, SubscriptionStatusEnum.EXPIRED)
+
+ def _createCheckoutSession(
+ self, mandateId: str, plan: SubscriptionPlan, subRecord: Dict[str, Any],
+ currentOperative: Optional[Dict[str, Any]], returnUrl: str,
+ ) -> str:
+ """Create a Stripe Checkout Session. If a predecessor exists, delays billing
+ via trial_end to start after the predecessor's period end."""
+ from modules.shared.stripeClient import getStripeClient
+ from modules.serviceCenter.services.serviceSubscription.stripeBootstrap import getStripePricesForPlan
+
+ stripe = getStripeClient()
+ priceMapping = getStripePricesForPlan(plan.planKey)
+ if not priceMapping or (not priceMapping.stripePriceIdUsers and not priceMapping.stripePriceIdInstances):
+ raise ValueError(f"Stripe Price IDs not provisioned for plan {plan.planKey}")
+
+ stripeCustomerId = self._resolveStripeCustomer(mandateId)
+ if not stripeCustomerId:
+ raise ValueError(f"Could not resolve Stripe customer for mandate {mandateId}")
+
+ activeUsers = self._interface.countActiveUsers(mandateId)
+ activeInstances = self._interface.countActiveFeatureInstances(mandateId)
+
+ lineItems = []
+ if priceMapping.stripePriceIdUsers:
+ lineItems.append({"price": priceMapping.stripePriceIdUsers, "quantity": max(activeUsers, 1)})
+ if priceMapping.stripePriceIdInstances and activeInstances > 0:
+ lineItems.append({"price": priceMapping.stripePriceIdInstances, "quantity": activeInstances})
+
+ if not returnUrl:
+ raise ValueError("returnUrl is required for paid subscription checkout")
+
+ from urllib.parse import urlparse
+ parsedReturn = urlparse(returnUrl)
+ platformUrl = f"{parsedReturn.scheme}://{parsedReturn.netloc}" if parsedReturn.scheme else ""
+
+ separator = "&" if "?" in returnUrl else "?"
+ successUrl = f"{returnUrl}{separator}success=true&session_id={{CHECKOUT_SESSION_ID}}"
+ cancelUrl = f"{returnUrl}{separator}canceled=true"
+
+ subscriptionData: Dict[str, Any] = {
+ "metadata": {
+ "mandateId": mandateId,
+ "subscriptionRecordId": subRecord["id"],
+ "planKey": plan.planKey,
+ "platformUrl": platformUrl,
+ },
+ }
+
+ if currentOperative and currentOperative.get("currentPeriodEnd"):
+ periodEnd = currentOperative["currentPeriodEnd"]
+ if isinstance(periodEnd, str):
+ periodEnd = datetime.fromisoformat(periodEnd)
+ trialEndTs = int(periodEnd.timestamp())
+ subscriptionData["trial_end"] = trialEndTs
+ self._interface.updateFields(subRecord["id"], {"effectiveFrom": periodEnd.isoformat()})
+
+ session = stripe.checkout.Session.create(
+ mode="subscription",
+ customer=stripeCustomerId,
+ line_items=lineItems,
+ success_url=successUrl,
+ cancel_url=cancelUrl,
+ subscription_data=subscriptionData,
+ )
+
+ if not session or not session.url:
+ raise ValueError("Stripe Checkout Session creation failed")
+
+ logger.info("Checkout session %s created for mandate %s, plan %s", session.id, mandateId, plan.planKey)
+ return session.url
+
+ def _resolveStripeCustomer(self, mandateId: str) -> Optional[str]:
+ try:
+ from modules.interfaces.interfaceDbBilling import getInterface as getBillingInterface
+ billingIf = getBillingInterface(self.currentUser, mandateId)
+ settings = billingIf.getSettings(mandateId)
+ if not settings:
+ return None
+ customerId = settings.get("stripeCustomerId")
+ if customerId:
+ return customerId
+
+ from modules.shared.stripeClient import getStripeClient
+ stripe = getStripeClient()
+
+ mandateLabel = mandateId
+ try:
+ from modules.datamodels.datamodelUam import Mandate
+ from modules.security.rootAccess import getRootDbAppConnector
+ appDb = getRootDbAppConnector()
+ rows = appDb.getRecordset(Mandate, recordFilter={"id": mandateId})
+ if rows:
+ mandateLabel = rows[0].get("label") or rows[0].get("name") or mandateId
+ except Exception:
+ pass
+
+ customer = stripe.Customer.create(name=mandateLabel, metadata={"mandateId": mandateId})
+ billingIf.updateSettings(settings["id"], {"stripeCustomerId": customer.id})
+ logger.info("Stripe customer %s created for mandate %s", customer.id, mandateId)
+ return customer.id
+ except Exception as e:
+ logger.error("_resolveStripeCustomer(%s) failed: %s", mandateId, e)
+ return None
+
+ # =========================================================================
+ # T7: Cancel (set recurring=false)
+ # =========================================================================
+
+ def cancelSubscription(self, subscriptionId: str) -> Dict[str, Any]:
+ """Cancel a subscription (T7: set recurring=false, Stripe cancel_at_period_end).
+ The subscription stays ACTIVE until its period ends."""
+ sub = self._interface.getById(subscriptionId)
+ if not sub:
+ raise ValueError(f"Subscription {subscriptionId} not found")
+
+ status = sub.get("status", "")
+ mandateId = sub["mandateId"]
+
+ if status == SubscriptionStatusEnum.PENDING.value:
+ result = self._interface.transitionStatus(
+ subscriptionId, SubscriptionStatusEnum.PENDING, SubscriptionStatusEnum.EXPIRED,
+ )
+ self.invalidateCache(mandateId)
+ return result
+
+ if status == SubscriptionStatusEnum.SCHEDULED.value:
+ stripeSubId = sub.get("stripeSubscriptionId")
+ if stripeSubId:
+ try:
+ from modules.shared.stripeClient import getStripeClient
+ stripe = getStripeClient()
+ stripe.Subscription.cancel(stripeSubId)
+ except Exception as e:
+ logger.error("Failed to cancel Stripe sub %s: %s", stripeSubId, e)
+ result = self._interface.transitionStatus(
+ subscriptionId, SubscriptionStatusEnum.SCHEDULED, SubscriptionStatusEnum.EXPIRED,
+ )
+ self.invalidateCache(mandateId)
+ return result
+
+ if status != SubscriptionStatusEnum.ACTIVE.value:
+ raise ValueError(f"Cannot cancel subscription in status {status}")
+
+ if not sub.get("recurring", True):
+ raise ValueError("Subscription is already cancelled (non-recurring)")
+
+ stripeSubId = sub.get("stripeSubscriptionId")
+ if stripeSubId:
+ try:
+ from modules.shared.stripeClient import getStripeClient
+ stripe = getStripeClient()
+ stripe.Subscription.modify(stripeSubId, cancel_at_period_end=True)
+ except Exception as e:
+ logger.error("Failed to set cancel_at_period_end for %s: %s", stripeSubId, e)
+
+ result = self._interface.updateFields(subscriptionId, {"recurring": False})
+ self.invalidateCache(mandateId)
+
+ plan = _getPlan(sub.get("planKey", ""))
+ _notifySubscriptionChange(mandateId, "cancelled", plan)
+ return result
+
+ # =========================================================================
+ # T8: Reactivate (set recurring=true)
+ # =========================================================================
+
+ def reactivateSubscription(self, subscriptionId: str) -> Dict[str, Any]:
+ """Reactivate a cancelled subscription before its period ends (T8: recurring=true)."""
+ sub = self._interface.getById(subscriptionId)
+ if not sub:
+ raise ValueError(f"Subscription {subscriptionId} not found")
+
+ if sub.get("status") != SubscriptionStatusEnum.ACTIVE.value:
+ raise ValueError(f"Can only reactivate ACTIVE subscriptions, got {sub.get('status')}")
+ if sub.get("recurring", True):
+ raise ValueError("Subscription is already recurring")
+
+ periodEnd = sub.get("currentPeriodEnd")
+ if periodEnd:
+ if isinstance(periodEnd, str):
+ periodEnd = datetime.fromisoformat(periodEnd)
+ if periodEnd <= datetime.now(timezone.utc):
+ raise ValueError("Cannot reactivate — period has already ended")
+
+ stripeSubId = sub.get("stripeSubscriptionId")
+ if stripeSubId:
+ try:
+ from modules.shared.stripeClient import getStripeClient
+ stripe = getStripeClient()
+ stripe.Subscription.modify(stripeSubId, cancel_at_period_end=False)
+ except Exception as e:
+ logger.error("Failed to reactivate Stripe sub %s: %s", stripeSubId, e)
+
+ result = self._interface.updateFields(subscriptionId, {"recurring": True})
+ self.invalidateCache(sub["mandateId"])
+ return result
+
+ # =========================================================================
+ # T13: Sysadmin force-cancel
+ # =========================================================================
+
+ def forceCancel(self, subscriptionId: str) -> Dict[str, Any]:
+ """Sysadmin force-cancel: immediately expire any non-terminal subscription."""
+ sub = self._interface.getById(subscriptionId)
+ if not sub:
+ raise ValueError(f"Subscription {subscriptionId} not found")
+
+ stripeSubId = sub.get("stripeSubscriptionId")
+ if stripeSubId:
+ try:
+ from modules.shared.stripeClient import getStripeClient
+ stripe = getStripeClient()
+ stripe.Subscription.cancel(stripeSubId)
+ except Exception as e:
+ logger.error("Failed to cancel Stripe sub %s: %s", stripeSubId, e)
+
+ result = self._interface.forceExpire(subscriptionId)
+ self.invalidateCache(sub["mandateId"])
+ return result
+
+ # =========================================================================
+ # T6: Trial expiry
+ # =========================================================================
+
+ def handleTrialExpiry(self, subscriptionId: str) -> None:
+ """Expire a trial subscription (T6: TRIALING -> EXPIRED)."""
+ sub = self._interface.getById(subscriptionId)
+ if not sub or sub.get("status") != SubscriptionStatusEnum.TRIALING.value:
+ return
+
+ self._interface.transitionStatus(
+ subscriptionId, SubscriptionStatusEnum.TRIALING, SubscriptionStatusEnum.EXPIRED,
+ )
+ self.invalidateCache(sub["mandateId"])
+
+ plan = _getPlan(sub.get("planKey", ""))
+ successorPlan = _getPlan(plan.successorPlanKey) if plan and plan.successorPlanKey else None
+ _notifySubscriptionChange(sub["mandateId"], "trial_expired", successorPlan)
+ logger.info("Trial expired for subscription %s", subscriptionId)
+
+ # =========================================================================
+ # Stripe quantity sync
+ # =========================================================================
+
+ def syncStripeQuantity(self, subscriptionId: str):
+ self._interface.syncQuantityToStripe(subscriptionId)
+
+
+# ============================================================================
+# Notifications
+# ============================================================================
+
+def _notifySubscriptionChange(
+ mandateId: str,
+ event: str,
+ plan: Optional[SubscriptionPlan] = None,
+ subscriptionRecord: Optional[Dict[str, Any]] = None,
+ platformUrl: str = "",
+) -> None:
+ try:
+ from modules.shared.notifyMandateAdmins import notifyMandateAdmins
+
+ planLabel = (plan.title.get("de") or plan.title.get("en") or plan.planKey) if plan else "—"
+ platformHint = f"Plattform: {platformUrl}" if platformUrl else ""
+
+ rawHtmlBlock: Optional[str] = None
+
+ if event == "activated" and plan and subscriptionRecord:
+ rawHtmlBlock = _buildInvoiceSummaryHtml(plan, subscriptionRecord, mandateId, platformUrl)
+
+ templates: Dict[str, Dict[str, Any]] = {
+ "activated": {
+ "subject": f"[PowerOn] Abonnement aktiviert — {planLabel}",
+ "headline": "Abonnement aktiviert",
+ "paragraphs": [
+ p for p in [
+ f"Das Abonnement wurde auf den Plan «{planLabel}» aktiviert.",
+ platformHint,
+ "Sie können Ihr Abonnement jederzeit unter Billing-Verwaltung › Abonnement einsehen und verwalten.",
+ ] if p
+ ],
+ },
+ "cancelled": {
+ "subject": f"[PowerOn] Abonnement gekündigt — {planLabel}",
+ "headline": "Abonnement gekündigt",
+ "paragraphs": [
+ p for p in [
+ f"Das Abonnement «{planLabel}» wurde gekündigt.",
+ platformHint,
+ "Die Kündigung wird zum Ende der aktuellen bezahlten Periode wirksam. Bis dahin bleibt der volle Zugang bestehen.",
+ ] if p
+ ],
+ },
+ "trial_expired": {
+ "subject": "[PowerOn] Testphase abgelaufen",
+ "headline": "Testphase abgelaufen",
+ "paragraphs": [
+ p for p in [
+ "Die kostenlose Testphase ist abgelaufen.",
+ platformHint,
+ "Bitte wählen Sie einen Plan unter Billing-Verwaltung › Abonnement, damit der Zugang nicht unterbrochen wird.",
+ ] if p
+ ],
+ },
+ "payment_failed": {
+ "subject": f"[PowerOn] Zahlung fehlgeschlagen — {planLabel}",
+ "headline": "Zahlung fehlgeschlagen",
+ "paragraphs": [
+ p for p in [
+ f"Die Zahlung für das Abonnement «{planLabel}» ist fehlgeschlagen.",
+ platformHint,
+ "Bitte aktualisieren Sie Ihr Zahlungsmittel unter Billing-Verwaltung.",
+ ] if p
+ ],
+ },
+ }
+
+ tpl = templates.get(event, {
+ "subject": f"[PowerOn] Abonnement-Änderung — {planLabel}",
+ "headline": "Abonnement-Änderung",
+ "paragraphs": [f"Änderung am Abonnement «{planLabel}»."],
+ })
+
+ notifyMandateAdmins(
+ mandateId, tpl["subject"], tpl["headline"], tpl["paragraphs"],
+ rawHtmlBlock=rawHtmlBlock,
+ )
+ except Exception as e:
+ logger.error("_notifySubscriptionChange failed for mandate %s event %s: %s", mandateId, event, e)
+
+
+def _buildInvoiceSummaryHtml(
+ plan: SubscriptionPlan,
+ subRecord: Dict[str, Any],
+ mandateId: str,
+ platformUrl: str = "",
+) -> str:
+ """Build an HTML invoice summary block for inclusion in the activation email."""
+ import html as htmlmod
+ from modules.interfaces.interfaceDbSubscription import _getRootInterface as getSubRootInterface
+
+ subInterface = getSubRootInterface()
+ userCount = subInterface.countActiveUsers(mandateId)
+ instanceCount = subInterface.countActiveFeatureInstances(mandateId)
+
+ userPrice = plan.pricePerUserCHF
+ instancePrice = plan.pricePerFeatureInstanceCHF
+ userTotal = userCount * userPrice
+ instanceTotal = instanceCount * instancePrice
+ netTotal = userTotal + instanceTotal
+
+ periodLabel = {"MONTHLY": "Monatlich", "YEARLY": "Jährlich"}.get(plan.billingPeriod, plan.billingPeriod)
+
+ def _chf(amount: float) -> str:
+ return f"CHF {amount:,.2f}".replace(",", "'")
+
+ rows = ""
+ if userPrice > 0:
+ rows += (
+ f'| Benutzer-Lizenzen | '
+ f'{userCount} × {_chf(userPrice)} | '
+ f'{_chf(userTotal)} |
\n'
+ )
+ if instancePrice > 0:
+ rows += (
+ f'| Feature-Instanzen | '
+ f'{instanceCount} × {_chf(instancePrice)} | '
+ f'{_chf(instanceTotal)} |
\n'
+ )
+
+ invoiceLink = ""
+ stripeSubId = subRecord.get("stripeSubscriptionId")
+ if stripeSubId:
+ try:
+ from modules.shared.stripeClient import getStripeClient
+ stripe = getStripeClient()
+ invoices = stripe.Invoice.list(subscription=stripeSubId, limit=1)
+ if invoices.data:
+ hostedUrl = invoices.data[0].get("hosted_invoice_url", "")
+ if hostedUrl:
+ invoiceLink = (
+ f''
+ f''
+ f'Vollständige Rechnung mit MwSt-Ausweis anzeigen
\n'
+ )
+ except Exception as e:
+ logger.warning("Could not fetch Stripe invoice URL for sub %s: %s", stripeSubId, e)
+
+ return (
+ f''
+ f''
+ f'| Position | '
+ f'Menge × Preis | '
+ f'Total | '
+ f'
'
+ f'{rows}'
+ f''
+ f'| Netto-Total ({periodLabel}) | '
+ f' | '
+ f'{_chf(netTotal)} | '
+ f'
'
+ f'
'
+ f'{invoiceLink}'
+ )
+
+
+# ============================================================================
+# Exception Classes
+# ============================================================================
+
+SUBSCRIPTION_USER_ACTION_UPGRADE = "UPGRADE_SUBSCRIPTION"
+SUBSCRIPTION_USER_ACTION_REACTIVATE = "REACTIVATE_SUBSCRIPTION"
+SUBSCRIPTION_USER_ACTION_ADD_PAYMENT = "ADD_PAYMENT_METHOD"
+
+SUBSCRIPTION_REASONS = {
+ "SUBSCRIPTION_INACTIVE",
+ "SUBSCRIPTION_PAYMENT_REQUIRED",
+ "SUBSCRIPTION_PAYMENT_PENDING",
+ "SUBSCRIPTION_EXPIRED",
+}
+
+
+def _subscriptionReasonForStatus(status: SubscriptionStatusEnum) -> str:
+ if status == SubscriptionStatusEnum.PENDING:
+ return "SUBSCRIPTION_PAYMENT_PENDING"
+ if status == SubscriptionStatusEnum.PAST_DUE:
+ return "SUBSCRIPTION_PAYMENT_REQUIRED"
+ if status == SubscriptionStatusEnum.EXPIRED:
+ return "SUBSCRIPTION_EXPIRED"
+ return "SUBSCRIPTION_INACTIVE"
+
+
+def _subscriptionUserActionForStatus(status: SubscriptionStatusEnum) -> str:
+ if status in (SubscriptionStatusEnum.PAST_DUE, SubscriptionStatusEnum.PENDING):
+ return SUBSCRIPTION_USER_ACTION_ADD_PAYMENT
+ return SUBSCRIPTION_USER_ACTION_UPGRADE
+
+
+class SubscriptionInactiveException(Exception):
+ def __init__(self, status: SubscriptionStatusEnum, mandateId: str = "", message: Optional[str] = None):
+ self.status = status
+ self.mandateId = mandateId
+ self.reason = _subscriptionReasonForStatus(status)
+ self.userAction = _subscriptionUserActionForStatus(status)
+ self.message = message or (
+ "Kein aktives Abonnement für diesen Mandanten. Bitte wählen Sie einen Plan unter Billing."
+ )
+ super().__init__(self.message)
+
+ def toClientDict(self) -> Dict[str, Any]:
+ out: Dict[str, Any] = {
+ "error": self.reason, "message": self.message,
+ "userAction": self.userAction, "subscriptionUiPath": "/admin/billing?tab=subscription",
+ }
+ if self.mandateId:
+ out["mandateId"] = self.mandateId
+ return out
+
+
+class SubscriptionCapacityException(Exception):
+ def __init__(self, resourceType: str, currentCount: int, maxAllowed: int, message: Optional[str] = None):
+ self.resourceType = resourceType
+ self.currentCount = currentCount
+ self.maxAllowed = maxAllowed
+ self.message = message or (
+ f"Ihr Plan erlaubt maximal {maxAllowed} {'Benutzer' if resourceType == 'users' else 'Feature-Instanzen'} "
+ f"(aktuell {currentCount}). Bitte wechseln Sie zu einem grösseren Plan."
+ )
+ super().__init__(self.message)
+
+ def toClientDict(self) -> Dict[str, Any]:
+ return {
+ "error": f"SUBSCRIPTION_{self.resourceType.upper()}_LIMIT",
+ "currentCount": self.currentCount, "maxAllowed": self.maxAllowed,
+ "message": self.message, "userAction": SUBSCRIPTION_USER_ACTION_UPGRADE,
+ "subscriptionUiPath": "/admin/billing?tab=subscription",
+ }
+
+
+SubscriptionService.SubscriptionInactiveException = SubscriptionInactiveException
+SubscriptionService.SubscriptionCapacityException = SubscriptionCapacityException
diff --git a/modules/serviceCenter/services/serviceSubscription/stripeBootstrap.py b/modules/serviceCenter/services/serviceSubscription/stripeBootstrap.py
new file mode 100644
index 00000000..fd5666e0
--- /dev/null
+++ b/modules/serviceCenter/services/serviceSubscription/stripeBootstrap.py
@@ -0,0 +1,214 @@
+# Copyright (c) 2025 Patrick Motsch
+# All rights reserved.
+"""
+Auto-provision Stripe Products and Prices from the built-in plan catalog.
+
+Creates separate Stripe Products for user licenses and feature instances
+so that invoice line items show clear, descriptive names:
+ - "Benutzer-Lizenzen"
+ - "Feature-Instanzen"
+
+Idempotent — safe to call on every startup.
+"""
+
+import logging
+from typing import Dict, Optional
+
+from modules.connectors.connectorDbPostgre import DatabaseConnector
+from modules.shared.configuration import APP_CONFIG
+from modules.datamodels.datamodelSubscription import (
+ BUILTIN_PLANS,
+ SubscriptionPlan,
+ BillingPeriodEnum,
+ StripePlanPrice,
+)
+
+logger = logging.getLogger(__name__)
+
+_BILLING_DATABASE = "poweron_billing"
+_METADATA_KEY = "poweron_plan_key"
+_METADATA_LINE_TYPE = "poweron_line_type"
+
+_PERIOD_TO_STRIPE = {
+ BillingPeriodEnum.MONTHLY: {"interval": "month", "interval_count": 1},
+ BillingPeriodEnum.YEARLY: {"interval": "year", "interval_count": 1},
+}
+
+
+def _getBillingDb() -> DatabaseConnector:
+ return DatabaseConnector(
+ dbDatabase=_BILLING_DATABASE,
+ dbHost=APP_CONFIG.get("DB_HOST", "localhost"),
+ dbPort=int(APP_CONFIG.get("DB_PORT", "5432")),
+ dbUser=APP_CONFIG.get("DB_USER"),
+ dbPassword=APP_CONFIG.get("DB_PASSWORD_SECRET"),
+ )
+
+
+def _loadExistingMappings(db: DatabaseConnector) -> Dict[str, StripePlanPrice]:
+ try:
+ rows = db.getRecordset(StripePlanPrice)
+ result = {}
+ for row in rows:
+ pk = row.get("planKey")
+ if pk:
+ result[pk] = StripePlanPrice(**{k: v for k, v in row.items() if not k.startswith("_")})
+ return result
+ except Exception as e:
+ logger.warning("Could not load StripePlanPrice records: %s", e)
+ return {}
+
+
+def _findStripeProduct(stripe, planKey: str, lineType: str) -> Optional[str]:
+ """Search Stripe for a product tagged with plan key + line type."""
+ try:
+ products = stripe.Product.search(
+ query=f'metadata["{_METADATA_KEY}"]:"{planKey}" AND metadata["{_METADATA_LINE_TYPE}"]:"{lineType}"',
+ limit=1,
+ )
+ if products.data:
+ return products.data[0].id
+ except Exception:
+ try:
+ products = stripe.Product.search(
+ query=f'metadata["{_METADATA_KEY}"]:"{planKey}"',
+ limit=10,
+ )
+ for p in products.data:
+ meta = p.get("metadata") or {}
+ if meta.get(_METADATA_LINE_TYPE) == lineType:
+ return p.id
+ except Exception:
+ pass
+ return None
+
+
+def _createStripeProduct(stripe, name: str, description: str, planKey: str, lineType: str) -> str:
+ product = stripe.Product.create(
+ name=name,
+ description=description,
+ metadata={_METADATA_KEY: planKey, _METADATA_LINE_TYPE: lineType},
+ )
+ logger.info("Created Stripe Product %s: %s (%s/%s)", product.id, name, planKey, lineType)
+ return product.id
+
+
+def _findExistingStripePrice(stripe, productId: str, unitAmount: int, interval: str) -> Optional[str]:
+ try:
+ prices = stripe.Price.list(product=productId, active=True, limit=50)
+ for p in prices.data:
+ recurring = p.get("recurring") or {}
+ if p.get("unit_amount") == unitAmount and recurring.get("interval") == interval:
+ return p.id
+ except Exception:
+ pass
+ return None
+
+
+def _createStripePrice(stripe, productId: str, unitAmountCHF: float, interval: str, nickname: str) -> str:
+ price = stripe.Price.create(
+ product=productId,
+ unit_amount=int(unitAmountCHF * 100),
+ currency="chf",
+ recurring={"interval": interval},
+ nickname=nickname,
+ )
+ logger.info("Created Stripe Price %s (%s, %s CHF/%s)", price.id, nickname, unitAmountCHF, interval)
+ return price.id
+
+
+def bootstrapStripePrices() -> None:
+ """Ensure all paid plans have separate Stripe Products for users and instances."""
+ try:
+ from modules.shared.stripeClient import getStripeClient
+ stripe = getStripeClient()
+ except ValueError as e:
+ logger.error("Stripe not configured — cannot bootstrap subscription prices: %s", e)
+ return
+
+ db = _getBillingDb()
+ existing = _loadExistingMappings(db)
+
+ for planKey, plan in BUILTIN_PLANS.items():
+ if plan.billingPeriod == BillingPeriodEnum.NONE:
+ continue
+ if plan.pricePerUserCHF == 0 and plan.pricePerFeatureInstanceCHF == 0:
+ continue
+
+ stripePeriod = _PERIOD_TO_STRIPE.get(plan.billingPeriod)
+ if not stripePeriod:
+ continue
+
+ interval = stripePeriod["interval"]
+
+ if planKey in existing:
+ mapping = existing[planKey]
+ hasAllPrices = mapping.stripePriceIdUsers and mapping.stripePriceIdInstances
+ hasAllProducts = mapping.stripeProductIdUsers and mapping.stripeProductIdInstances
+ if hasAllPrices and hasAllProducts:
+ logger.debug("Stripe prices already configured for plan %s", planKey)
+ continue
+
+ productIdUsers = None
+ productIdInstances = None
+ priceIdUsers = None
+ priceIdInstances = None
+
+ if plan.pricePerUserCHF > 0:
+ productIdUsers = _findStripeProduct(stripe, planKey, "users")
+ if not productIdUsers:
+ productIdUsers = _createStripeProduct(
+ stripe, "Benutzer-Lizenzen", f"Benutzer-Lizenzen für {plan.title.get('de', planKey)}",
+ planKey, "users",
+ )
+ priceIdUsers = _findExistingStripePrice(stripe, productIdUsers, int(plan.pricePerUserCHF * 100), interval)
+ if not priceIdUsers:
+ priceIdUsers = _createStripePrice(
+ stripe, productIdUsers, plan.pricePerUserCHF, interval, f"{planKey} — Benutzer-Lizenz",
+ )
+
+ if plan.pricePerFeatureInstanceCHF > 0:
+ productIdInstances = _findStripeProduct(stripe, planKey, "instances")
+ if not productIdInstances:
+ productIdInstances = _createStripeProduct(
+ stripe, "Feature-Instanzen", f"Feature-Instanzen für {plan.title.get('de', planKey)}",
+ planKey, "instances",
+ )
+ priceIdInstances = _findExistingStripePrice(
+ stripe, productIdInstances, int(plan.pricePerFeatureInstanceCHF * 100), interval,
+ )
+ if not priceIdInstances:
+ priceIdInstances = _createStripePrice(
+ stripe, productIdInstances, plan.pricePerFeatureInstanceCHF, interval,
+ f"{planKey} — Feature-Instanz",
+ )
+
+ persistData = {
+ "stripeProductId": "",
+ "stripeProductIdUsers": productIdUsers,
+ "stripeProductIdInstances": productIdInstances,
+ "stripePriceIdUsers": priceIdUsers,
+ "stripePriceIdInstances": priceIdInstances,
+ }
+
+ if planKey in existing:
+ db.recordModify(StripePlanPrice, existing[planKey].id, persistData)
+ else:
+ db.recordCreate(StripePlanPrice, StripePlanPrice(planKey=planKey, **persistData).model_dump())
+
+ logger.info(
+ "Stripe bootstrapped for %s: users=%s/%s, instances=%s/%s",
+ planKey, productIdUsers, priceIdUsers, productIdInstances, priceIdInstances,
+ )
+
+
+def getStripePricesForPlan(planKey: str) -> Optional[StripePlanPrice]:
+ """Load the persisted Stripe IDs for a plan."""
+ try:
+ db = _getBillingDb()
+ rows = db.getRecordset(StripePlanPrice, recordFilter={"planKey": planKey})
+ if rows:
+ return StripePlanPrice(**{k: v for k, v in rows[0].items() if not k.startswith("_")})
+ except Exception as e:
+ logger.error("Error loading Stripe prices for plan %s: %s", planKey, e)
+ return None
diff --git a/modules/shared/configuration.py b/modules/shared/configuration.py
index 73e2da90..721ce448 100644
--- a/modules/shared/configuration.py
+++ b/modules/shared/configuration.py
@@ -66,7 +66,7 @@ class Configuration:
self._configMtime = currentMtime
try:
- with open(configPath, 'r') as f:
+ with open(configPath, 'r', encoding='utf-8') as f:
lines = f.readlines()
i = 0
diff --git a/modules/shared/notifyMandateAdmins.py b/modules/shared/notifyMandateAdmins.py
new file mode 100644
index 00000000..27445afb
--- /dev/null
+++ b/modules/shared/notifyMandateAdmins.py
@@ -0,0 +1,285 @@
+# Copyright (c) 2025 Patrick Motsch
+# All rights reserved.
+"""
+Central notification utility for mandate administrators.
+
+All mandate-level notifications (subscription changes, billing warnings, etc.)
+MUST go through notifyMandateAdmins() to ensure consistent recipient resolution
+and delivery.
+
+Recipients are the union of:
+1. BillingSettings.notifyEmails for the mandate (configured contact addresses)
+2. All users with the mandate-level "admin" RBAC role
+"""
+
+from __future__ import annotations
+
+import html
+import json
+import logging
+from typing import Any, Dict, List, Optional, Set
+
+from modules.datamodels.datamodelMessaging import MessagingChannel
+from modules.interfaces.interfaceMessaging import getInterface as getMessagingInterface
+
+logger = logging.getLogger(__name__)
+
+
+# ============================================================================
+# Recipient resolution
+# ============================================================================
+
+
+def _normalizeEmailList(raw: Any) -> List[str]:
+ """Parse notifyEmails which can be a list, JSON string, or single address."""
+ if raw is None:
+ return []
+ if isinstance(raw, list):
+ return [str(e).strip().lower() for e in raw if str(e).strip()]
+ if isinstance(raw, str):
+ s = raw.strip()
+ if not s:
+ return []
+ if s.startswith("["):
+ try:
+ parsed = json.loads(s)
+ if isinstance(parsed, list):
+ return [str(e).strip().lower() for e in parsed if str(e).strip()]
+ except Exception:
+ pass
+ return [s.lower()]
+ return []
+
+
+def _resolveMandateContactEmails(mandateId: str) -> List[str]:
+ """Get the configured notifyEmails from BillingSettings."""
+ try:
+ from modules.interfaces.interfaceDbBilling import getInterface as getBillingInterface
+ from modules.security.rootAccess import getRootUser
+ billingIf = getBillingInterface(getRootUser(), mandateId)
+ settings = billingIf.getSettings(mandateId) or {}
+ return _normalizeEmailList(settings.get("notifyEmails"))
+ except Exception as e:
+ logger.warning("Could not resolve BillingSettings.notifyEmails for mandate %s: %s", mandateId, e)
+ return []
+
+
+def _resolveMandateAdminEmails(mandateId: str) -> List[str]:
+ """Resolve all admin users of a mandate via RBAC and return their emails."""
+ emails: List[str] = []
+ try:
+ from modules.interfaces.interfaceDbApp import getRootInterface
+ rootIf = getRootInterface()
+ userMandates = rootIf.getUserMandatesByMandate(mandateId)
+ for um in userMandates:
+ if not getattr(um, "enabled", True):
+ continue
+ umId = str(getattr(um, "id", ""))
+ userId = getattr(um, "userId", None)
+ if not userId:
+ continue
+ roleIds = rootIf.getRoleIdsForUserMandate(umId)
+ isAdmin = False
+ for roleId in roleIds:
+ role = rootIf.getRole(roleId)
+ if role and role.roleLabel == "admin" and not role.featureInstanceId:
+ isAdmin = True
+ break
+ if not isAdmin:
+ continue
+ user = rootIf.getUser(str(userId))
+ if user and user.email:
+ emails.append(user.email.strip().lower())
+ except Exception as e:
+ logger.warning("Could not resolve admin emails for mandate %s: %s", mandateId, e)
+ return emails
+
+
+def _resolveAllRecipients(mandateId: str) -> List[str]:
+ """Union of BillingSettings.notifyEmails + all mandate admin user emails, deduplicated."""
+ seen: Set[str] = set()
+ result: List[str] = []
+ for email in _resolveMandateContactEmails(mandateId) + _resolveMandateAdminEmails(mandateId):
+ if email and email not in seen:
+ seen.add(email)
+ result.append(email)
+ return result
+
+
+# ============================================================================
+# Mandate name resolution
+# ============================================================================
+
+
+def _resolveMandateName(mandateId: str) -> str:
+ """Return the human-readable mandate name (label or name), falling back to a short ID."""
+ try:
+ from modules.datamodels.datamodelUam import Mandate
+ from modules.security.rootAccess import getRootDbAppConnector
+ appDb = getRootDbAppConnector()
+ rows = appDb.getRecordset(Mandate, recordFilter={"id": mandateId})
+ if rows:
+ return rows[0].get("label") or rows[0].get("name") or mandateId[:8]
+ except Exception as e:
+ logger.warning("Could not resolve mandate name for %s: %s", mandateId, e)
+ return mandateId[:8]
+
+
+# ============================================================================
+# HTML email rendering
+# ============================================================================
+
+
+def _getOperatorInfo() -> Dict[str, str]:
+ """Load operator company data from config.ini."""
+ try:
+ from modules.shared.configuration import APP_CONFIG
+ return {
+ "companyName": APP_CONFIG.get("Operator_CompanyName", ""),
+ "address": APP_CONFIG.get("Operator_Address", ""),
+ "vatNumber": APP_CONFIG.get("Operator_VatNumber", ""),
+ }
+ except Exception:
+ return {"companyName": "", "address": "", "vatNumber": ""}
+
+
+def _renderHtmlEmail(
+ headline: str,
+ bodyParagraphs: List[str],
+ mandateName: str,
+ footerNote: Optional[str] = None,
+ rawHtmlBlock: Optional[str] = None,
+) -> str:
+ """Render a clean, professional HTML notification email.
+
+ Args:
+ rawHtmlBlock: Optional pre-formatted HTML inserted after bodyParagraphs (e.g. invoice table).
+ """
+ hl = html.escape(headline)
+ mn = html.escape(mandateName)
+
+ paragraphsHtml = ""
+ for p in bodyParagraphs:
+ escaped = html.escape(p).replace("\n", "
")
+ paragraphsHtml += f'{escaped}
\n'
+
+ rawBlock = ""
+ if rawHtmlBlock:
+ rawBlock = f'{rawHtmlBlock}
\n'
+
+ footer = ""
+ if footerNote:
+ footer = (
+ f''
+ f'{html.escape(footerNote)}
\n'
+ )
+
+ operator = _getOperatorInfo()
+ operatorLine = ""
+ parts = [p for p in [operator["companyName"], operator["address"], operator["vatNumber"]] if p]
+ if parts:
+ operatorLine = (
+ f''
+ f'{html.escape(" | ".join(parts))}
\n'
+ )
+
+ return f"""
+
+
+
+
+
+
+
+
+ PowerOn
+ |
+
+
+ {hl}
+ Mandant: {mn}
+
+ {paragraphsHtml}
+ {rawBlock}
+
+ {footer}
+ |
+
+ |
+
+ Diese E-Mail wurde automatisch von PowerOn versendet.
+
+ {operatorLine}
+ |
+
+ |
+
+
+"""
+
+
+# ============================================================================
+# Public API
+# ============================================================================
+
+
+def notifyMandateAdmins(
+ mandateId: str,
+ subject: str,
+ headline: str,
+ bodyParagraphs: List[str],
+ *,
+ footerNote: Optional[str] = None,
+ rawHtmlBlock: Optional[str] = None,
+) -> int:
+ """
+ Send a styled HTML notification to all mandate admins and configured contacts.
+
+ Args:
+ mandateId: The mandate to notify admins for.
+ subject: Email subject line.
+ headline: Bold headline inside the email body.
+ bodyParagraphs: List of paragraph strings (plain text, auto-escaped).
+ footerNote: Optional small-print note below the main content.
+ rawHtmlBlock: Optional pre-formatted HTML block (e.g. invoice summary table).
+
+ Returns:
+ Number of recipients that were successfully notified.
+ """
+ if not mandateId:
+ return 0
+
+ recipients = _resolveAllRecipients(mandateId)
+ if not recipients:
+ logger.warning(
+ "notifyMandateAdmins: no recipients found for mandate %s "
+ "(no notifyEmails configured and no admin users with email)",
+ mandateId,
+ )
+ return 0
+
+ mandateName = _resolveMandateName(mandateId)
+ htmlMessage = _renderHtmlEmail(headline, bodyParagraphs, mandateName, footerNote, rawHtmlBlock)
+ messaging = getMessagingInterface()
+ successCount = 0
+
+ for to in recipients:
+ try:
+ ok = messaging.send(
+ channel=MessagingChannel.EMAIL,
+ recipient=to,
+ subject=subject,
+ message=htmlMessage,
+ )
+ if ok:
+ successCount += 1
+ else:
+ logger.warning("notifyMandateAdmins: send failed for %s", to)
+ except Exception as e:
+ logger.error("notifyMandateAdmins: error sending to %s: %s", to, e)
+
+ logger.info(
+ "notifyMandateAdmins: sent '%s' to %d/%d recipients for mandate %s (%s)",
+ subject, successCount, len(recipients), mandateId, mandateName,
+ )
+ return successCount
diff --git a/modules/shared/stripeClient.py b/modules/shared/stripeClient.py
new file mode 100644
index 00000000..9c7b4c67
--- /dev/null
+++ b/modules/shared/stripeClient.py
@@ -0,0 +1,38 @@
+# Copyright (c) 2025 Patrick Motsch
+# All rights reserved.
+"""
+Central Stripe SDK initialization.
+
+All Stripe interactions MUST use getStripeClient() to ensure consistent
+API key, API version, and fallback handling across billing and subscription flows.
+"""
+
+import logging
+from typing import Optional
+
+logger = logging.getLogger(__name__)
+
+_stripeInitialized = False
+
+
+def getStripeClient():
+ """
+ Initialize and return the configured Stripe SDK module.
+
+ Raises ValueError if no Stripe secret key is configured.
+ """
+ import stripe
+ from modules.shared.configuration import APP_CONFIG
+
+ apiVersion = APP_CONFIG.get("STRIPE_API_VERSION")
+ if apiVersion:
+ stripe.api_version = apiVersion
+
+ secretKey = APP_CONFIG.get("STRIPE_SECRET_KEY_SECRET") or APP_CONFIG.get("STRIPE_SECRET_KEY")
+ if not secretKey:
+ raise ValueError("STRIPE_SECRET_KEY_SECRET not configured")
+
+ stripe.api_key = secretKey
+ return stripe
+
+
diff --git a/modules/system/mainSystem.py b/modules/system/mainSystem.py
index a4b92ca4..58667645 100644
--- a/modules/system/mainSystem.py
+++ b/modules/system/mainSystem.py
@@ -190,7 +190,6 @@ NAVIGATION_SECTIONS = [
"path": "/admin/billing",
"order": 40,
"adminOnly": True,
- "sysAdminOnly": True,
},
],
},