# Copyright (c) 2025 Patrick Motsch # All rights reserved. """ Subscription Service — state-machine-based lifecycle management. Every mutation takes an explicit subscriptionId. No status-scan guessing. See wiki/concepts/Subscription-State-Machine.md for the full state machine. """ import logging import time from typing import Dict, Any, List, Optional from datetime import datetime, timezone, timedelta from modules.datamodels.datamodelUam import User from modules.datamodels.datamodelSubscription import ( SubscriptionPlan, MandateSubscription, SubscriptionStatusEnum, BillingPeriodEnum, OPERATIVE_STATUSES, getPlan, _getSelectablePlans, ) from modules.interfaces.interfaceDbSubscription import ( getInterface as getSubscriptionInterface, InvalidTransitionError, ) from modules.shared.i18nRegistry import t logger = logging.getLogger(__name__) SUBSCRIPTION_CACHE_TTL_SECONDS = 60 _STALE_PENDING_SECONDS = 30 * 60 _subscriptionServices: Dict[str, "SubscriptionService"] = {} _statusCache: Dict[str, tuple] = {} def getService(currentUser: User, mandateId: str) -> "SubscriptionService": cacheKey = f"{currentUser.id}_{mandateId}" if cacheKey not in _subscriptionServices: _subscriptionServices[cacheKey] = SubscriptionService(currentUser, mandateId) else: _subscriptionServices[cacheKey].setContext(currentUser, mandateId) return _subscriptionServices[cacheKey] class SubscriptionService: """State-machine-based subscription service. All mutations use explicit subscriptionId. No scan-based writes.""" def __init__(self, contextOrUser, mandateId=None, get_service=None): if mandateId is not None and callable(mandateId): ctx = contextOrUser self.currentUser = ctx.user self.mandateId = ctx.mandate_id or "" elif get_service is not None and hasattr(contextOrUser, "user"): ctx = contextOrUser self.currentUser = ctx.user self.mandateId = ctx.mandate_id or "" else: self.currentUser = contextOrUser self.mandateId = mandateId or "" self._interface = getSubscriptionInterface(self.currentUser, self.mandateId) def setContext(self, currentUser: User, mandateId: str): self.currentUser = currentUser self.mandateId = mandateId self._interface = getSubscriptionInterface(currentUser, mandateId) # ========================================================================= # Billing gate (cached, read-only) # ========================================================================= def assertActive(self, mandateId: str = None) -> SubscriptionStatusEnum: """Return subscription status for billing decisions. Uses TTL cache. This is the ONLY method that works by mandateId (read-only).""" mid = mandateId or self.mandateId now = time.monotonic() cached = _statusCache.get(mid) if cached and cached[1] > now: return cached[0] status = self._interface.assertActive(mid) _statusCache[mid] = (status, now + SUBSCRIPTION_CACHE_TTL_SECONDS) return status def invalidateCache(self, mandateId: str = None): mid = mandateId or self.mandateId _statusCache.pop(mid, None) # ========================================================================= # Capacity (delegation) # ========================================================================= def assertCapacity(self, mandateId: str, resourceType: str, delta: int = 1) -> bool: return self._interface.assertCapacity(mandateId or self.mandateId, resourceType, delta) # ========================================================================= # Read operations # ========================================================================= def getById(self, subscriptionId: str) -> Optional[Dict[str, Any]]: return self._interface.getById(subscriptionId) def getOperativeSubscription(self, mandateId: str = None) -> Optional[Dict[str, Any]]: return self._interface.getOperativeForMandate(mandateId or self.mandateId) def getScheduledSubscription(self, mandateId: str = None) -> Optional[Dict[str, Any]]: return self._interface.getScheduledForMandate(mandateId or self.mandateId) def listSubscriptions(self, mandateId: str = None, statusFilter=None) -> List[Dict[str, Any]]: return self._interface.listForMandate(mandateId or self.mandateId, statusFilter) def getSelectablePlans(self) -> List[SubscriptionPlan]: return _getSelectablePlans() def getPlan(self, planKey: str) -> Optional[SubscriptionPlan]: return getPlan(planKey) # ========================================================================= # T1/T2: Plan activation (creates PENDING, returns checkout URL) # ========================================================================= def activatePlan(self, mandateId: str, planKey: str, returnUrl: str) -> Dict[str, Any]: """Create a new subscription as PENDING and start the checkout flow. - Free/trial plans: immediately ACTIVE/TRIALING (no checkout). - Paid plans with active predecessor: PENDING -> checkout -> SCHEDULED on confirmation. - Paid plans without predecessor: PENDING -> checkout -> ACTIVE on confirmation. Cleans up any existing PENDING/SCHEDULED for this mandate first (by ID).""" mid = mandateId or self.mandateId plan = getPlan(planKey) if not plan: raise ValueError(f"Unknown plan: {planKey}") isPaid = plan.billingPeriod != BillingPeriodEnum.NONE and not plan.trialDays currentOperative = self._interface.getOperativeForMandate(mid) self._cleanupPreparatorySubscriptions(mid) now = datetime.now(timezone.utc) nowTs = now.timestamp() if plan.trialDays: initialStatus = SubscriptionStatusEnum.TRIALING elif isPaid: initialStatus = SubscriptionStatusEnum.PENDING else: initialStatus = SubscriptionStatusEnum.ACTIVE sub = MandateSubscription( mandateId=mid, planKey=planKey, status=initialStatus, recurring=plan.autoRenew and not plan.trialDays, startedAt=nowTs, currentPeriodStart=nowTs, snapshotPricePerUserCHF=plan.pricePerUserCHF, snapshotPricePerInstanceCHF=plan.pricePerFeatureInstanceCHF, ) if plan.trialDays: sub.trialEndsAt = (now + timedelta(days=plan.trialDays)).timestamp() if plan.billingPeriod == BillingPeriodEnum.MONTHLY: sub.currentPeriodEnd = (now + timedelta(days=30)).timestamp() elif plan.billingPeriod == BillingPeriodEnum.YEARLY: sub.currentPeriodEnd = (now + timedelta(days=365)).timestamp() created = self._interface.createSubscription(sub) from urllib.parse import urlparse parsed = urlparse(returnUrl) if returnUrl else None pUrl = f"{parsed.scheme}://{parsed.netloc}" if parsed and parsed.scheme else "" if isPaid: try: checkoutUrl = self._createCheckoutSession(mid, plan, created, currentOperative, returnUrl) created["redirectUrl"] = checkoutUrl except Exception as e: logger.exception( "Checkout creation failed for mandate %s, plan %s — force-expiring PENDING %s", mid, planKey, created["id"], ) self._interface.forceExpire(created["id"]) self.invalidateCache(mid) raise ValueError(f"Subscription konnte nicht erstellt werden: {e}") from e else: if currentOperative: self._expireOperative(currentOperative["id"], mid) _notifySubscriptionChange(mid, "activated", plan, subscriptionRecord=created, platformUrl=pUrl) self.invalidateCache(mid) return created def _cleanupPreparatorySubscriptions(self, mandateId: str) -> None: """Expire any existing PENDING or SCHEDULED subscriptions for this mandate (by ID).""" preparatory = self._interface.listForMandate( mandateId, [SubscriptionStatusEnum.PENDING, SubscriptionStatusEnum.SCHEDULED], ) for sub in preparatory: subId = sub["id"] currentStatus = SubscriptionStatusEnum(sub["status"]) stripeSubId = sub.get("stripeSubscriptionId") if stripeSubId and currentStatus == SubscriptionStatusEnum.SCHEDULED: try: from modules.shared.stripeClient import getStripeClient stripe = getStripeClient() stripe.Subscription.cancel(stripeSubId) except Exception as e: logger.error("Failed to cancel Stripe sub %s during cleanup: %s", stripeSubId, e) self._interface.transitionStatus(subId, currentStatus, SubscriptionStatusEnum.EXPIRED) logger.info("Cleaned up %s subscription %s for mandate %s", currentStatus.value, subId, mandateId) def _expireOperative(self, subscriptionId: str, mandateId: str) -> None: """Expire the current operative subscription (used when a free/trial plan replaces it).""" sub = self._interface.getById(subscriptionId) if not sub: return currentStatus = SubscriptionStatusEnum(sub["status"]) if currentStatus in OPERATIVE_STATUSES: stripeSubId = sub.get("stripeSubscriptionId") if stripeSubId: try: from modules.shared.stripeClient import getStripeClient stripe = getStripeClient() stripe.Subscription.cancel(stripeSubId) except Exception as e: logger.error("Failed to cancel Stripe sub %s: %s", stripeSubId, e) self._interface.transitionStatus(subscriptionId, currentStatus, SubscriptionStatusEnum.EXPIRED) def _createCheckoutSession( self, mandateId: str, plan: SubscriptionPlan, subRecord: Dict[str, Any], currentOperative: Optional[Dict[str, Any]], returnUrl: str, ) -> str: """Create a Stripe Checkout Session. If a predecessor exists, delays billing via trial_end to start after the predecessor's period end.""" from modules.shared.stripeClient import getStripeClient from modules.serviceCenter.services.serviceSubscription.stripeBootstrap import getStripePricesForPlan stripe = getStripeClient() priceMapping = getStripePricesForPlan(plan.planKey) if not priceMapping or (not priceMapping.stripePriceIdUsers and not priceMapping.stripePriceIdInstances): raise ValueError(f"Stripe Price IDs not provisioned for plan {plan.planKey}") # Defense in depth: if either of the persisted Stripe Price IDs has been # archived in Stripe in the meantime (e.g. another environment's bootstrap # rotated them on a shared Stripe account), the upcoming # ``checkout.Session.create`` would fail with "The price specified is # inactive". Trigger a one-shot bootstrap re-run to rotate inactive prices, # then reload the mapping. This is idempotent and cheap when nothing # changed. if not self._areStripePricesActive(stripe, priceMapping): logger.warning( "Stripe Price(s) for plan %s are no longer active in Stripe — " "running bootstrap to rotate.", plan.planKey, ) try: from modules.serviceCenter.services.serviceSubscription.stripeBootstrap import bootstrapStripePrices bootstrapStripePrices() priceMapping = getStripePricesForPlan(plan.planKey) except Exception as ex: logger.error("Inline Stripe bootstrap failed for plan %s: %s", plan.planKey, ex) if not priceMapping or not self._areStripePricesActive(stripe, priceMapping): raise ValueError( f"Stripe Price IDs for plan {plan.planKey} are inactive and " "could not be rotated automatically." ) stripeCustomerId = self._resolveStripeCustomer(mandateId) if not stripeCustomerId: raise ValueError(f"Could not resolve Stripe customer for mandate {mandateId}") activeUsers = self._interface.countActiveUsers(mandateId) activeInstances = self._interface.countActiveFeatureInstances(mandateId) billableModules = max(0, activeInstances - plan.includedModules) lineItems = [] if priceMapping.stripePriceIdUsers: lineItems.append({"price": priceMapping.stripePriceIdUsers, "quantity": max(activeUsers, 1)}) if priceMapping.stripePriceIdInstances and billableModules > 0: lineItems.append({"price": priceMapping.stripePriceIdInstances, "quantity": billableModules}) if not returnUrl: raise ValueError("returnUrl is required for paid subscription checkout") from urllib.parse import urlparse parsedReturn = urlparse(returnUrl) platformUrl = f"{parsedReturn.scheme}://{parsedReturn.netloc}" if parsedReturn.scheme else "" separator = "&" if "?" in returnUrl else "?" successUrl = f"{returnUrl}{separator}success=true&session_id={{CHECKOUT_SESSION_ID}}" cancelUrl = f"{returnUrl}{separator}canceled=true" subscriptionData: Dict[str, Any] = { "metadata": { "mandateId": mandateId, "subscriptionRecordId": subRecord["id"], "planKey": plan.planKey, "platformUrl": platformUrl, }, } isTrialPredecessor = ( currentOperative is not None and currentOperative.get("status") == SubscriptionStatusEnum.TRIALING.value ) if currentOperative and currentOperative.get("currentPeriodEnd") and not isTrialPredecessor: periodEnd = currentOperative["currentPeriodEnd"] subscriptionData["trial_end"] = int(periodEnd) self._interface.updateFields(subRecord["id"], {"effectiveFrom": periodEnd}) session = None for attempt in range(2): try: session = stripe.checkout.Session.create( mode="subscription", customer=stripeCustomerId, line_items=lineItems, success_url=successUrl, cancel_url=cancelUrl, subscription_data=subscriptionData, ) break except Exception as e: if attempt == 0 and self._isStripeMissingCustomerError(e): logger.warning( "Stripe reports missing customer %s for mandate %s — " "clearing stored stripeCustomerId (wrong account, deleted customer, or copied DB).", stripeCustomerId, mandateId, ) self._clearStoredStripeCustomerId(mandateId) stripeCustomerId = self._resolveStripeCustomer(mandateId) if not stripeCustomerId: raise ValueError( f"Could not recreate Stripe customer for mandate {mandateId}" ) from e continue raise if not session or not session.url: raise ValueError("Stripe Checkout Session creation failed") logger.info("Checkout session %s created for mandate %s, plan %s", session.id, mandateId, plan.planKey) return session.url @staticmethod def _isStripeMissingCustomerError(exc: BaseException) -> bool: code = getattr(exc, "code", None) param = getattr(exc, "param", None) if code == "resource_missing" and param == "customer": return True body = getattr(exc, "json_body", None) if isinstance(body, dict): err = body.get("error") if isinstance(err, dict): return err.get("code") == "resource_missing" and err.get("param") == "customer" return False def _clearStoredStripeCustomerId(self, mandateId: str) -> None: try: from modules.interfaces.interfaceDbBilling import getInterface as getBillingInterface billingIf = getBillingInterface(self.currentUser, mandateId) settings = billingIf.getSettings(mandateId) if not settings or not settings.get("stripeCustomerId"): return billingIf.updateSettings(settings["id"], {"stripeCustomerId": None}) logger.info("Cleared stripeCustomerId on billing settings for mandate %s", mandateId) except Exception as e: logger.error("Failed to clear stripeCustomerId for mandate %s: %s", mandateId, e) def _areStripePricesActive(self, stripe, priceMapping) -> bool: """Verify that every persisted Stripe Price ID for the plan is still ``active`` in Stripe. ``stripe.Price.retrieve`` returns archived prices too, so we must inspect the ``active`` flag explicitly. Returns True only when ALL non-empty price IDs resolve to active prices.""" priceIds = [pid for pid in ( getattr(priceMapping, "stripePriceIdUsers", None), getattr(priceMapping, "stripePriceIdInstances", None), ) if pid] if not priceIds: return False for pid in priceIds: try: price = stripe.Price.retrieve(pid) if not bool(getattr(price, "active", False) if not isinstance(price, dict) else price.get("active")): return False except Exception as ex: logger.warning("Stripe Price %s could not be retrieved: %s", pid, ex) return False return True def _resolveStripeCustomer(self, mandateId: str) -> Optional[str]: try: from modules.interfaces.interfaceDbBilling import getInterface as getBillingInterface billingIf = getBillingInterface(self.currentUser, mandateId) settings = billingIf.getSettings(mandateId) if not settings: return None customerId = settings.get("stripeCustomerId") if customerId: return customerId from modules.shared.stripeClient import getStripeClient stripe = getStripeClient() mandateLabel = mandateId try: from modules.datamodels.datamodelUam import Mandate from modules.security.rootAccess import getRootDbAppConnector appDb = getRootDbAppConnector() rows = appDb.getRecordset(Mandate, recordFilter={"id": mandateId}) if rows: mandateLabel = rows[0].get("label") or rows[0].get("name") or mandateId except Exception: pass customer = stripe.Customer.create(name=mandateLabel, metadata={"mandateId": mandateId}) billingIf.updateSettings(settings["id"], {"stripeCustomerId": customer.id}) logger.info("Stripe customer %s created for mandate %s", customer.id, mandateId) return customer.id except Exception as e: logger.error("_resolveStripeCustomer(%s) failed: %s", mandateId, e) return None # ========================================================================= # T7: Cancel (set recurring=false) # ========================================================================= def cancelSubscription(self, subscriptionId: str) -> Dict[str, Any]: """Cancel a subscription (T7: set recurring=false, Stripe cancel_at_period_end). The subscription stays ACTIVE until its period ends.""" sub = self._interface.getById(subscriptionId) if not sub: raise ValueError(f"Subscription {subscriptionId} not found") status = sub.get("status", "") mandateId = sub["mandateId"] if status == SubscriptionStatusEnum.PENDING.value: result = self._interface.transitionStatus( subscriptionId, SubscriptionStatusEnum.PENDING, SubscriptionStatusEnum.EXPIRED, ) self.invalidateCache(mandateId) return result if status == SubscriptionStatusEnum.SCHEDULED.value: stripeSubId = sub.get("stripeSubscriptionId") if stripeSubId: try: from modules.shared.stripeClient import getStripeClient stripe = getStripeClient() stripe.Subscription.cancel(stripeSubId) except Exception as e: logger.error("Failed to cancel Stripe sub %s: %s", stripeSubId, e) result = self._interface.transitionStatus( subscriptionId, SubscriptionStatusEnum.SCHEDULED, SubscriptionStatusEnum.EXPIRED, ) self.invalidateCache(mandateId) return result if status != SubscriptionStatusEnum.ACTIVE.value: raise ValueError(f"Cannot cancel subscription in status {status}") if not sub.get("recurring", True): raise ValueError("Subscription is already cancelled (non-recurring)") stripeSubId = sub.get("stripeSubscriptionId") pUrl = "" if stripeSubId: try: from modules.shared.stripeClient import getStripeClient stripe = getStripeClient() from modules.shared.stripeClient import stripeToDict stripeSub = stripeToDict(stripe.Subscription.modify(stripeSubId, cancel_at_period_end=True)) pUrl = (stripeSub.get("metadata") or {}).get("platformUrl", "") except Exception as e: logger.error("Failed to set cancel_at_period_end for %s: %s", stripeSubId, e) result = self._interface.updateFields(subscriptionId, {"recurring": False}) self.invalidateCache(mandateId) plan = getPlan(sub.get("planKey", "")) _notifySubscriptionChange(mandateId, "cancelled", plan, subscriptionRecord=sub, platformUrl=pUrl) return result # ========================================================================= # T8: Reactivate (set recurring=true) # ========================================================================= def reactivateSubscription(self, subscriptionId: str) -> Dict[str, Any]: """Reactivate a cancelled subscription before its period ends (T8: recurring=true).""" sub = self._interface.getById(subscriptionId) if not sub: raise ValueError(f"Subscription {subscriptionId} not found") if sub.get("status") != SubscriptionStatusEnum.ACTIVE.value: raise ValueError(f"Can only reactivate ACTIVE subscriptions, got {sub.get('status')}") if sub.get("recurring", True): raise ValueError("Subscription is already recurring") periodEnd = sub.get("currentPeriodEnd") if periodEnd: if periodEnd <= datetime.now(timezone.utc).timestamp(): raise ValueError("Cannot reactivate — period has already ended") stripeSubId = sub.get("stripeSubscriptionId") if stripeSubId: try: from modules.shared.stripeClient import getStripeClient stripe = getStripeClient() stripe.Subscription.modify(stripeSubId, cancel_at_period_end=False) except Exception as e: logger.error("Failed to reactivate Stripe sub %s: %s", stripeSubId, e) result = self._interface.updateFields(subscriptionId, {"recurring": True}) self.invalidateCache(sub["mandateId"]) return result # ========================================================================= # T13: Sysadmin force-cancel # ========================================================================= def forceCancel(self, subscriptionId: str) -> Dict[str, Any]: """Sysadmin force-cancel: immediately expire any non-terminal subscription.""" sub = self._interface.getById(subscriptionId) if not sub: raise ValueError(f"Subscription {subscriptionId} not found") stripeSubId = sub.get("stripeSubscriptionId") pUrl = "" if stripeSubId: try: from modules.shared.stripeClient import getStripeClient stripe = getStripeClient() from modules.shared.stripeClient import stripeToDict stripeSub = stripeToDict(stripe.Subscription.retrieve(stripeSubId)) pUrl = (stripeSub.get("metadata") or {}).get("platformUrl", "") stripe.Subscription.cancel(stripeSubId) except Exception as e: logger.error("Failed to cancel Stripe sub %s: %s", stripeSubId, e) result = self._interface.forceExpire(subscriptionId) mandateId = sub["mandateId"] self.invalidateCache(mandateId) plan = getPlan(sub.get("planKey", "")) _notifySubscriptionChange(mandateId, "force_cancelled", plan, subscriptionRecord=sub, platformUrl=pUrl) return result # ========================================================================= # T6: Trial expiry # ========================================================================= def handleTrialExpiry(self, subscriptionId: str) -> None: """Expire a trial subscription (T6: TRIALING -> EXPIRED).""" sub = self._interface.getById(subscriptionId) if not sub or sub.get("status") != SubscriptionStatusEnum.TRIALING.value: return self._interface.transitionStatus( subscriptionId, SubscriptionStatusEnum.TRIALING, SubscriptionStatusEnum.EXPIRED, ) self.invalidateCache(sub["mandateId"]) plan = getPlan(sub.get("planKey", "")) successorPlan = getPlan(plan.successorPlanKey) if plan and plan.successorPlanKey else None _notifySubscriptionChange(sub["mandateId"], "trial_expired", successorPlan) logger.info("Trial expired for subscription %s", subscriptionId) # ========================================================================= # Stripe quantity sync # ========================================================================= def syncStripeQuantity(self, subscriptionId: str): self._interface.syncQuantityToStripe(subscriptionId) # ========================================================================= # Enterprise subscription management (sysadmin-only) # ========================================================================= def createEnterprise( self, mandateId: str, startDate: float, endDate: float, autoRenew: bool, flatPriceCHF: float, maxUsers: Optional[int], maxFeatureInstances: Optional[int], maxDataVolumeMB: Optional[int], budgetAiCHF: Optional[float], note: Optional[str] = None, ) -> Dict[str, Any]: """Create a new enterprise subscription with custom flat pricing and limits. 1. Cleanup PENDING/SCHEDULED predecessors 2. Expire current operative subscription (no Stripe cancel) 3. Create ACTIVE MandateSubscription with enterprise fields 4. Credit fixed AI budget to mandate pool 5. Send invoice email to mandate admins """ self._cleanupPreparatorySubscriptions(mandateId) currentOperative = self._interface.getOperativeForMandate(mandateId) if currentOperative: self._expireOperative(currentOperative["id"], mandateId) sub = MandateSubscription( mandateId=mandateId, planKey="ENTERPRISE", status=SubscriptionStatusEnum.ACTIVE, recurring=autoRenew, startedAt=datetime.now(timezone.utc).timestamp(), currentPeriodStart=startDate, currentPeriodEnd=endDate, isEnterprise=True, enterpriseFlatPriceCHF=flatPriceCHF, enterpriseMaxUsers=maxUsers, enterpriseMaxFeatureInstances=maxFeatureInstances, enterpriseMaxDataVolumeMB=maxDataVolumeMB, enterpriseBudgetAiCHF=budgetAiCHF, enterpriseNote=note, ) created = self._interface.createSubscription(sub) self.invalidateCache(mandateId) self._creditEnterpriseBudget(mandateId, budgetAiCHF, "Erstaktivierung") _notifyEnterpriseInvoice(mandateId, created) logger.info("Enterprise subscription created for mandate %s: id=%s", mandateId, created["id"]) return created def renewEnterprise( self, subscriptionId: str, newEndDate: float, overrides: Optional[Dict[str, Any]] = None, ) -> Dict[str, Any]: """Renew an enterprise subscription: expire old, create new with same or overridden params. 1. Load + validate old subscription 2. Expire old subscription 3. Create new ACTIVE subscription (clone params, apply overrides) 4. Credit AI budget 5. Send invoice email """ oldSub = self._interface.getById(subscriptionId) if not oldSub: raise ValueError(f"Subscription {subscriptionId} not found") if not oldSub.get("isEnterprise"): raise ValueError(f"Subscription {subscriptionId} is not an enterprise subscription") mandateId = oldSub["mandateId"] self._interface.forceExpire(subscriptionId) self.invalidateCache(mandateId) overrides = overrides or {} nowTs = datetime.now(timezone.utc).timestamp() startDate = nowTs autoRenew = overrides.get("autoRenew", oldSub.get("recurring", False)) flatPriceCHF = overrides.get("flatPriceCHF", oldSub.get("enterpriseFlatPriceCHF")) maxUsers = overrides.get("maxUsers", oldSub.get("enterpriseMaxUsers")) maxFeatureInstances = overrides.get("maxFeatureInstances", oldSub.get("enterpriseMaxFeatureInstances")) maxDataVolumeMB = overrides.get("maxDataVolumeMB", oldSub.get("enterpriseMaxDataVolumeMB")) budgetAiCHF = overrides.get("budgetAiCHF", oldSub.get("enterpriseBudgetAiCHF")) note = overrides.get("note", oldSub.get("enterpriseNote")) sub = MandateSubscription( mandateId=mandateId, planKey="ENTERPRISE", status=SubscriptionStatusEnum.ACTIVE, recurring=autoRenew, startedAt=nowTs, currentPeriodStart=startDate, currentPeriodEnd=newEndDate, isEnterprise=True, enterpriseFlatPriceCHF=flatPriceCHF, enterpriseMaxUsers=maxUsers, enterpriseMaxFeatureInstances=maxFeatureInstances, enterpriseMaxDataVolumeMB=maxDataVolumeMB, enterpriseBudgetAiCHF=budgetAiCHF, enterpriseNote=note, ) created = self._interface.createSubscription(sub) self.invalidateCache(mandateId) self._creditEnterpriseBudget(mandateId, budgetAiCHF, "Erneuerung") _notifyEnterpriseInvoice(mandateId, created) logger.info( "Enterprise subscription renewed for mandate %s: old=%s -> new=%s", mandateId, subscriptionId, created["id"], ) return created def updateEnterprise(self, subscriptionId: str, changes: Dict[str, Any]) -> Dict[str, Any]: """Update enterprise subscription parameters (limits, note, flat price). Only enterprise-specific fields are allowed. No status change.""" sub = self._interface.getById(subscriptionId) if not sub: raise ValueError(f"Subscription {subscriptionId} not found") if not sub.get("isEnterprise"): raise ValueError(f"Subscription {subscriptionId} is not an enterprise subscription") allowedFields = { "enterpriseFlatPriceCHF", "enterpriseMaxUsers", "enterpriseMaxFeatureInstances", "enterpriseMaxDataVolumeMB", "enterpriseBudgetAiCHF", "enterpriseNote", "recurring", } updateData = {k: v for k, v in changes.items() if k in allowedFields} if not updateData: raise ValueError("No valid enterprise fields to update") result = self._interface.updateFields(subscriptionId, updateData) self.invalidateCache(sub["mandateId"]) logger.info("Enterprise subscription %s updated: %s", subscriptionId, list(updateData.keys())) return result def _creditEnterpriseBudget( self, mandateId: str, budgetAiCHF: Optional[float], periodLabel: str, ) -> None: if not budgetAiCHF or budgetAiCHF <= 0: return try: from modules.interfaces.interfaceDbBilling import getRootInterface as _getBillingRoot _getBillingRoot().creditSubscriptionBudget( mandateId, "ENTERPRISE", periodLabel=periodLabel, enterpriseBudgetOverride=budgetAiCHF, ) except Exception as e: logger.error("Enterprise budget credit failed for mandate %s: %s", mandateId, e) # ============================================================================ # Enterprise Invoice Email # ============================================================================ def _notifyEnterpriseInvoice(mandateId: str, subRecord: Dict[str, Any]) -> None: """Send enterprise invoice email to mandate admins.""" try: from modules.shared.notifyMandateAdmins import notifyMandateAdmins rawHtml = _buildEnterpriseInvoiceHtml(subRecord) flatPrice = subRecord.get("enterpriseFlatPriceCHF") or 0 notifyMandateAdmins( mandateId, t("[PowerOn] Enterprise-Abonnement — Rechnung") + f" (CHF {flatPrice:,.2f})", t("Enterprise-Abonnement — Rechnung"), [ t("Das Enterprise-Abonnement wurde aktiviert."), t("Bitte begleichen Sie den Rechnungsbetrag innert 10 Tagen."), t("Details zum Abonnement finden Sie unter Billing-Verwaltung."), ], rawHtmlBlock=rawHtml, ) except Exception as e: logger.error("Enterprise invoice email failed for mandate %s: %s", mandateId, e) def _buildEnterpriseInvoiceHtml(subRecord: Dict[str, Any]) -> str: """Build HTML invoice summary for enterprise subscription email.""" flatPrice = subRecord.get("enterpriseFlatPriceCHF") or 0 maxUsers = subRecord.get("enterpriseMaxUsers") maxFeatures = subRecord.get("enterpriseMaxFeatureInstances") maxStorageMB = subRecord.get("enterpriseMaxDataVolumeMB") budgetAi = subRecord.get("enterpriseBudgetAiCHF") note = subRecord.get("enterpriseNote") or "" periodStart = subRecord.get("currentPeriodStart") periodEnd = subRecord.get("currentPeriodEnd") def _chf(amount: float) -> str: return f"CHF {amount:,.2f}".replace(",", "'") def _fmtDate(ts: Optional[float]) -> str: if not ts: return "—" from datetime import datetime, timezone return datetime.fromtimestamp(ts, tz=timezone.utc).strftime("%d.%m.%Y") detailRows = "" if maxUsers is not None: detailRows += ( f'{t("Benutzer")}' f'max. {maxUsers}' ) if maxFeatures is not None: detailRows += ( f'{t("Module")}' f'max. {maxFeatures}' ) if maxStorageMB is not None: storageLabel = f"{maxStorageMB} MB" if maxStorageMB < 1024 else f"{maxStorageMB / 1024:.1f} GB" detailRows += ( f'{t("Datenvolumen")}' f'max. {storageLabel}' ) if budgetAi is not None and budgetAi > 0: detailRows += ( f'{t("KI-Budget")}' f'{_chf(budgetAi)}' ) noteHtml = "" if note: import html as htmlmod noteHtml = ( f'

' f'{t("Notiz")}: {htmlmod.escape(note)}

' ) return ( f'' f'' f'' f'' f'' f'' f'{detailRows}' f'' f'' f'' f'' f'' f'
{t("Zeitraum")}{_fmtDate(periodStart)} – {_fmtDate(periodEnd)}
{t("Pauschale")}' f'{_chf(flatPrice)}
' f'

' f'{t("Zahlungsfrist")}: {t("10 Tage")}

' f'{noteHtml}' ) # ============================================================================ # Notifications # ============================================================================ def _notifySubscriptionChange( mandateId: str, event: str, plan: Optional[SubscriptionPlan] = None, subscriptionRecord: Optional[Dict[str, Any]] = None, platformUrl: str = "", ) -> None: try: from modules.shared.notifyMandateAdmins import notifyMandateAdmins planLabel = (plan.title or plan.planKey) if plan else "\u2014" platformHint = f"Plattform: {platformUrl}" if platformUrl else "" rawHtmlBlock: Optional[str] = None if event == "activated" and plan and subscriptionRecord: rawHtmlBlock = _buildInvoiceSummaryHtml(plan, subscriptionRecord, mandateId, platformUrl) elif event in ("cancelled", "force_cancelled") and subscriptionRecord: rawHtmlBlock = _buildCancelSummaryHtml(subscriptionRecord, platformUrl) templates: Dict[str, Dict[str, Any]] = { "activated": { "subject": f"[PowerOn] {t('Abonnement aktiviert')} — {planLabel}", "headline": t("Abonnement aktiviert"), "paragraphs": [ p for p in [ t("Das Abonnement wurde auf den Plan «{planLabel}» aktiviert.").format(planLabel=planLabel), platformHint, t("Sie können Ihr Abonnement jederzeit unter Billing-Verwaltung › Abonnement einsehen und verwalten."), ] if p ], }, "cancelled": { "subject": f"[PowerOn] {t('Abonnement gekündigt')} — {planLabel}", "headline": t("Abonnement gekündigt"), "paragraphs": [ p for p in [ t("Das Abonnement «{planLabel}» wurde gekündigt.").format(planLabel=planLabel), platformHint, t("Die Kündigung wird zum Ende der aktuellen bezahlten Periode wirksam. Bis dahin bleibt der volle Zugang bestehen."), ] if p ], }, "force_cancelled": { "subject": f"[PowerOn] {t('Abonnement sofort beendet')} — {planLabel}", "headline": t("Abonnement sofort beendet"), "paragraphs": [ p for p in [ t("Das Abonnement «{planLabel}» wurde durch den Plattform-Administrator sofort beendet.").format(planLabel=planLabel), platformHint, t("Der Zugang wurde per sofort deaktiviert. Bei Fragen wenden Sie sich an den Plattform-Support."), ] if p ], }, "trial_expired": { "subject": f"[PowerOn] {t('Testphase abgelaufen')}", "headline": t("Testphase abgelaufen"), "paragraphs": [ p for p in [ t("Die kostenlose Testphase ist abgelaufen."), platformHint, t("Bitte wählen Sie einen Plan unter Billing-Verwaltung › Abonnement, damit der Zugang nicht unterbrochen wird."), ] if p ], }, "payment_failed": { "subject": f"[PowerOn] {t('Zahlung fehlgeschlagen')} — {planLabel}", "headline": t("Zahlung fehlgeschlagen"), "paragraphs": [ p for p in [ t("Die Zahlung für das Abonnement «{planLabel}» ist fehlgeschlagen.").format(planLabel=planLabel), platformHint, t("Bitte aktualisieren Sie Ihr Zahlungsmittel unter Billing-Verwaltung."), ] if p ], }, } tpl = templates.get(event, { "subject": f"[PowerOn] {t('Abonnement-Änderung')} — {planLabel}", "headline": t("Abonnement-Änderung"), "paragraphs": [t("Änderung am Abonnement «{planLabel}».").format(planLabel=planLabel)], }) notifyMandateAdmins( mandateId, tpl["subject"], tpl["headline"], tpl["paragraphs"], rawHtmlBlock=rawHtmlBlock, ) except Exception as e: logger.error("_notifySubscriptionChange failed for mandate %s event %s: %s", mandateId, event, e) def _buildInvoiceSummaryHtml( plan: SubscriptionPlan, subRecord: Dict[str, Any], mandateId: str, platformUrl: str = "", ) -> str: """Build an HTML invoice summary block for inclusion in the activation email.""" import html as htmlmod from modules.interfaces.interfaceDbSubscription import getRootInterface as getSubRootInterface subInterface = getSubRootInterface() userCount = subInterface.countActiveUsers(mandateId) instanceCount = subInterface.countActiveFeatureInstances(mandateId) billableModules = max(0, instanceCount - plan.includedModules) userPrice = plan.pricePerUserCHF instancePrice = plan.pricePerFeatureInstanceCHF userTotal = userCount * userPrice instanceTotal = billableModules * instancePrice netTotal = userTotal + instanceTotal periodLabel = {"MONTHLY": t("Monatlich"), "YEARLY": t("Jährlich")}.get(plan.billingPeriod, plan.billingPeriod) def _chf(amount: float) -> str: return f"CHF {amount:,.2f}".replace(",", "'") rows = "" if userPrice > 0: rows += ( f'{t("Benutzer-Lizenzen")}' f'{userCount} × {_chf(userPrice)}' f'{_chf(userTotal)}\n' ) if instancePrice > 0 and billableModules > 0: rows += ( f'{t("Module")} ({instanceCount} total, {plan.includedModules} {t("inkl.")})' f'{billableModules} × {_chf(instancePrice)}' f'{_chf(instanceTotal)}\n' ) invoiceLink = "" stripeSubId = subRecord.get("stripeSubscriptionId") if stripeSubId: try: from modules.shared.stripeClient import getStripeClient stripe = getStripeClient() invoices = stripe.Invoice.list(subscription=stripeSubId, limit=1) if invoices.data: from modules.shared.stripeClient import stripeToDict inv = stripeToDict(invoices.data[0]) hostedUrl = inv.get("hosted_invoice_url", "") if hostedUrl: invoiceLink = ( f'

' f'' f'{t("Vollständige Rechnung mit MwSt-Ausweis anzeigen")}

\n' ) except Exception as e: logger.warning("Could not fetch Stripe invoice URL for sub %s: %s", stripeSubId, e) return ( f'' f'' f'' f'' f'' f'' f'{rows}' f'' f'' f'' f'' f'' f'
{t("Position")}{t("Menge")} × {t("Preis")}{t("Total")}
{t("Netto-Total")} ({periodLabel}){_chf(netTotal)}
' f'{invoiceLink}' ) def _buildCancelSummaryHtml(subRecord: Dict[str, Any], platformUrl: str = "") -> str: """Build an HTML block with billing link and Stripe invoice link for cancel emails.""" import html as htmlmod parts: list[str] = [] stripeSubId = subRecord.get("stripeSubscriptionId") if stripeSubId: try: from modules.shared.stripeClient import getStripeClient stripe = getStripeClient() invoices = stripe.Invoice.list(subscription=stripeSubId, limit=1) if invoices.data: from modules.shared.stripeClient import stripeToDict inv = stripeToDict(invoices.data[0]) hostedUrl = inv.get("hosted_invoice_url", "") if hostedUrl: parts.append( f'

' f'' f'{t("Letzte Stripe-Rechnung anzeigen")}

' ) except Exception as e: logger.warning("Could not fetch Stripe invoice URL for sub %s: %s", stripeSubId, e) return "\n".join(parts) if parts else "" # ============================================================================ # Exception Classes # ============================================================================ SUBSCRIPTION_USER_ACTION_UPGRADE = "UPGRADE_SUBSCRIPTION" SUBSCRIPTION_USER_ACTION_REACTIVATE = "REACTIVATE_SUBSCRIPTION" SUBSCRIPTION_USER_ACTION_ADD_PAYMENT = "ADD_PAYMENT_METHOD" SUBSCRIPTION_REASONS = { "SUBSCRIPTION_INACTIVE", "SUBSCRIPTION_PAYMENT_REQUIRED", "SUBSCRIPTION_PAYMENT_PENDING", "SUBSCRIPTION_EXPIRED", } def _subscriptionReasonForStatus(status: SubscriptionStatusEnum) -> str: if status == SubscriptionStatusEnum.PENDING: return "SUBSCRIPTION_PAYMENT_PENDING" if status == SubscriptionStatusEnum.PAST_DUE: return "SUBSCRIPTION_PAYMENT_REQUIRED" if status == SubscriptionStatusEnum.EXPIRED: return "SUBSCRIPTION_EXPIRED" return "SUBSCRIPTION_INACTIVE" def _subscriptionUserActionForStatus(status: SubscriptionStatusEnum) -> str: if status in (SubscriptionStatusEnum.PAST_DUE, SubscriptionStatusEnum.PENDING): return SUBSCRIPTION_USER_ACTION_ADD_PAYMENT return SUBSCRIPTION_USER_ACTION_UPGRADE class SubscriptionInactiveException(Exception): def __init__(self, status: SubscriptionStatusEnum, mandateId: str = "", message: Optional[str] = None): self.status = status self.mandateId = mandateId self.reason = _subscriptionReasonForStatus(status) self.userAction = _subscriptionUserActionForStatus(status) self.message = message or t( "Kein aktives Abonnement für diesen Mandanten. Bitte wählen Sie einen Plan unter Billing." ) super().__init__(self.message) def toClientDict(self) -> Dict[str, Any]: out: Dict[str, Any] = { "error": self.reason, "message": self.message, "userAction": self.userAction, "subscriptionUiPath": "/admin/billing?tab=subscription", } if self.mandateId: out["mandateId"] = self.mandateId return out SUBSCRIPTION_USER_ACTION_CONTACT_ADMIN = "CONTACT_ADMIN" def _subscriptionLimitsHint() -> str: return " " + t( "Details zu Ihrem Abonnement, den enthaltenen Limits und Upgrade-Optionen: " "Menü «Administration» → «Billing» → Registerkarte «Abonnement»." ) def _enterpriseLimitsHint() -> str: return " " + t( "Ihr Enterprise-Abonnement wird vom Plattform-Administrator verwaltet. " "Bitte kontaktieren Sie den Administrator für eine Anpassung der Limiten." ) class SubscriptionCapacityException(Exception): def __init__(self, resourceType: str, currentCount: int, maxAllowed: int, message: Optional[str] = None, isEnterprise: bool = False): self.resourceType = resourceType self.currentCount = currentCount self.maxAllowed = maxAllowed self.isEnterprise = isEnterprise hint = _enterpriseLimitsHint() if isEnterprise else _subscriptionLimitsHint() if message is not None: self.message = message elif resourceType == "users": self.message = t( "Mit dem aktuellen Abonnement sind für diesen Mandanten höchstens {maxAllowed} " "Benutzer zulässig (derzeit {currentCount}). " "Ohne Planwechsel können keine weiteren Benutzer hinzugefügt werden." ).format(maxAllowed=maxAllowed, currentCount=currentCount) + hint elif resourceType == "featureInstances": self.message = t( "Es sind höchstens {maxAllowed} aktive Module erlaubt (derzeit {currentCount}). " "Bitte Abonnement erweitern oder ein Modul entfernen." ).format(maxAllowed=maxAllowed, currentCount=currentCount) + hint elif resourceType == "dataVolumeMB": self.message = t( "Das im Abonnement enthaltene Datenvolumen ({maxAllowed} MB) reicht nicht " "(aktuell ca. {currentCount} MB). Bitte Speicher-Limit oder Plan anpassen." ).format(maxAllowed=maxAllowed, currentCount=currentCount) + hint else: self.message = t( "Abonnement-Limit überschritten (Ressource «{resourceType}»: " "aktuell {currentCount}, erlaubt {maxAllowed})." ).format(resourceType=resourceType, currentCount=currentCount, maxAllowed=maxAllowed) + hint super().__init__(self.message) def toClientDict(self) -> Dict[str, Any]: action = SUBSCRIPTION_USER_ACTION_CONTACT_ADMIN if self.isEnterprise else SUBSCRIPTION_USER_ACTION_UPGRADE return { "error": f"SUBSCRIPTION_{self.resourceType.upper()}_LIMIT", "currentCount": self.currentCount, "maxAllowed": self.maxAllowed, "message": self.message, "userAction": action, "subscriptionUiPath": "/admin/billing?tab=subscription", } SubscriptionService.SubscriptionInactiveException = SubscriptionInactiveException SubscriptionService.SubscriptionCapacityException = SubscriptionCapacityException