From d118128813366e340d6565cb7db2fc2bdb610f28 Mon Sep 17 00:00:00 2001 From: ValueOn AG Date: Wed, 4 Feb 2026 21:50:55 +0100 Subject: [PATCH] billing initial --- app.py | 3 + modules/datamodels/datamodelBilling.py | 265 +++++++ .../automation/datamodelFeatureAutomation.py | 2 + modules/interfaces/interfaceBootstrap.py | 113 +++ modules/interfaces/interfaceDbBilling.py | 734 ++++++++++++++++++ modules/routes/routeBilling.py | 557 +++++++++++++ modules/services/serviceBilling/__init__.py | 7 + .../serviceBilling/mainServiceBilling.py | 408 ++++++++++ .../services/serviceChat/mainServiceChat.py | 62 +- modules/system/mainSystem.py | 32 + 10 files changed, 2182 insertions(+), 1 deletion(-) create mode 100644 modules/datamodels/datamodelBilling.py create mode 100644 modules/interfaces/interfaceDbBilling.py create mode 100644 modules/routes/routeBilling.py create mode 100644 modules/services/serviceBilling/__init__.py create mode 100644 modules/services/serviceBilling/mainServiceBilling.py diff --git a/app.py b/app.py index 474de4d6..609d0c07 100644 --- a/app.py +++ b/app.py @@ -503,6 +503,9 @@ app.include_router(userAccessOverviewRouter) from modules.routes.routeGdpr import router as gdprRouter app.include_router(gdprRouter) +from modules.routes.routeBilling import router as billingRouter +app.include_router(billingRouter) + # ============================================================================ # SYSTEM ROUTES (Navigation, etc.) # ============================================================================ diff --git a/modules/datamodels/datamodelBilling.py b/modules/datamodels/datamodelBilling.py new file mode 100644 index 00000000..e7e59eb4 --- /dev/null +++ b/modules/datamodels/datamodelBilling.py @@ -0,0 +1,265 @@ +# Copyright (c) 2025 Patrick Motsch +# All rights reserved. +"""Billing models: BillingAccount, BillingTransaction, BillingSettings, UsageStatistics.""" + +from typing import List, Dict, Any, Optional +from enum import Enum +from datetime import date, datetime +from pydantic import BaseModel, Field +from modules.shared.attributeUtils import registerModelLabels +import uuid + + +class BillingModelEnum(str, Enum): + """Billing model types.""" + PREPAY_MANDATE = "PREPAY_MANDATE" # Prepaid budget shared by all users in mandate + PREPAY_USER = "PREPAY_USER" # Prepaid budget per user within mandate + CREDIT_POSTPAY = "CREDIT_POSTPAY" # Credit with monthly invoice (requires billing address) + UNLIMITED = "UNLIMITED" # No cost limitation (internal mandates only) + + +class AccountTypeEnum(str, Enum): + """Account type for billing accounts.""" + MANDATE = "MANDATE" # Account for entire mandate + USER = "USER" # Account for specific user within mandate + + +class TransactionTypeEnum(str, Enum): + """Transaction types for billing.""" + CREDIT = "CREDIT" # Credit/top-up (positive) + DEBIT = "DEBIT" # Debit/usage (positive amount, reduces balance) + ADJUSTMENT = "ADJUSTMENT" # Manual adjustment by admin + + +class ReferenceTypeEnum(str, Enum): + """Reference types for transactions.""" + WORKFLOW = "WORKFLOW" # AI workflow usage + PAYMENT = "PAYMENT" # Payment/top-up + ADMIN = "ADMIN" # Admin adjustment + SYSTEM = "SYSTEM" # System credit (e.g., initial credit) + + +class PeriodTypeEnum(str, Enum): + """Period types for usage statistics.""" + DAY = "DAY" + MONTH = "MONTH" + YEAR = "YEAR" + + +class BillingAddress(BaseModel): + """Billing address for CREDIT_POSTPAY mandates.""" + company: str = Field(..., description="Company name") + street: str = Field(..., description="Street and number") + zip: str = Field(..., description="Postal code") + city: str = Field(..., description="City") + country: str = Field(default="CH", description="Country code") + vatNumber: Optional[str] = Field(None, description="VAT number (optional)") + + +registerModelLabels( + "BillingAddress", + {"en": "Billing Address", "de": "Rechnungsadresse"}, + { + "company": {"en": "Company", "de": "Firma"}, + "street": {"en": "Street", "de": "Strasse"}, + "zip": {"en": "ZIP", "de": "PLZ"}, + "city": {"en": "City", "de": "Ort"}, + "country": {"en": "Country", "de": "Land"}, + "vatNumber": {"en": "VAT Number", "de": "MwSt-Nummer"}, + }, +) + + +class BillingAccount(BaseModel): + """Billing account for mandate or user-mandate combination.""" + id: str = Field( + default_factory=lambda: str(uuid.uuid4()), description="Primary key" + ) + mandateId: str = Field(..., description="Foreign key to Mandate") + userId: Optional[str] = Field(None, description="Foreign key to User (only for PREPAY_USER)") + accountType: AccountTypeEnum = Field(..., description="Account type: MANDATE or USER") + balance: float = Field(default=0.0, description="Current balance in CHF") + creditLimit: Optional[float] = Field(None, description="Credit limit in CHF (only for CREDIT_POSTPAY)") + warningThreshold: float = Field(default=0.0, description="Warning threshold in CHF") + lastWarningAt: Optional[datetime] = Field(None, description="Last warning sent timestamp") + enabled: bool = Field(default=True, description="Account is active") + + +registerModelLabels( + "BillingAccount", + {"en": "Billing Account", "de": "Abrechnungskonto"}, + { + "id": {"en": "ID", "de": "ID"}, + "mandateId": {"en": "Mandate ID", "de": "Mandanten-ID"}, + "userId": {"en": "User ID", "de": "Benutzer-ID"}, + "accountType": {"en": "Account Type", "de": "Kontotyp"}, + "balance": {"en": "Balance (CHF)", "de": "Guthaben (CHF)"}, + "creditLimit": {"en": "Credit Limit (CHF)", "de": "Kreditlimit (CHF)"}, + "warningThreshold": {"en": "Warning Threshold (CHF)", "de": "Warnschwelle (CHF)"}, + "lastWarningAt": {"en": "Last Warning", "de": "Letzte Warnung"}, + "enabled": {"en": "Enabled", "de": "Aktiv"}, + }, +) + + +class BillingTransaction(BaseModel): + """Single billing transaction (credit, debit, adjustment).""" + id: str = Field( + default_factory=lambda: str(uuid.uuid4()), description="Primary key" + ) + accountId: str = Field(..., description="Foreign key to BillingAccount") + transactionType: TransactionTypeEnum = Field(..., description="Transaction type") + amount: float = Field(..., description="Amount in CHF (always positive)") + description: str = Field(..., description="Transaction description") + + # Reference to source + referenceType: Optional[ReferenceTypeEnum] = Field(None, description="Reference type") + referenceId: Optional[str] = Field(None, description="Reference ID") + + # Context for workflow transactions + workflowId: Optional[str] = Field(None, description="Workflow ID (for WORKFLOW transactions)") + featureInstanceId: Optional[str] = Field(None, description="Feature instance ID") + featureCode: Optional[str] = Field(None, description="Feature code (e.g., chatplayground, automation)") + aicoreProvider: Optional[str] = Field(None, description="AICore provider (anthropic, openai, etc.)") + + +registerModelLabels( + "BillingTransaction", + {"en": "Billing Transaction", "de": "Transaktion"}, + { + "id": {"en": "ID", "de": "ID"}, + "accountId": {"en": "Account ID", "de": "Konto-ID"}, + "transactionType": {"en": "Type", "de": "Typ"}, + "amount": {"en": "Amount (CHF)", "de": "Betrag (CHF)"}, + "description": {"en": "Description", "de": "Beschreibung"}, + "referenceType": {"en": "Reference Type", "de": "Referenztyp"}, + "referenceId": {"en": "Reference ID", "de": "Referenz-ID"}, + "workflowId": {"en": "Workflow ID", "de": "Workflow-ID"}, + "featureInstanceId": {"en": "Feature Instance ID", "de": "Feature-Instanz-ID"}, + "featureCode": {"en": "Feature Code", "de": "Feature-Code"}, + "aicoreProvider": {"en": "AI Provider", "de": "AI-Anbieter"}, + }, +) + + +class BillingSettings(BaseModel): + """Billing settings per mandate.""" + id: str = Field( + default_factory=lambda: str(uuid.uuid4()), description="Primary key" + ) + mandateId: str = Field(..., description="Foreign key to Mandate (UNIQUE)") + billingModel: BillingModelEnum = Field(..., description="Billing model") + + # Configuration + defaultUserCredit: float = Field(default=10.0, description="Initial credit in CHF for new users (PREPAY_USER)") + warningThresholdPercent: float = Field(default=10.0, description="Warning threshold as percentage") + blockOnZeroBalance: bool = Field(default=True, description="Block AI features when balance is zero") + + # Billing address (required for CREDIT_POSTPAY) + billingAddress: Optional[BillingAddress] = Field(None, description="Billing address") + + # Notifications + notifyEmails: List[str] = Field(default_factory=list, description="Email addresses for billing notifications") + notifyOnWarning: bool = Field(default=True, description="Send email when warning threshold is reached") + + +registerModelLabels( + "BillingSettings", + {"en": "Billing Settings", "de": "Abrechnungseinstellungen"}, + { + "id": {"en": "ID", "de": "ID"}, + "mandateId": {"en": "Mandate ID", "de": "Mandanten-ID"}, + "billingModel": {"en": "Billing Model", "de": "Abrechnungsmodell"}, + "defaultUserCredit": {"en": "Default User Credit (CHF)", "de": "Standard-Startguthaben (CHF)"}, + "warningThresholdPercent": {"en": "Warning Threshold (%)", "de": "Warnschwelle (%)"}, + "blockOnZeroBalance": {"en": "Block on Zero Balance", "de": "Bei 0 blockieren"}, + "billingAddress": {"en": "Billing Address", "de": "Rechnungsadresse"}, + "notifyEmails": {"en": "Notification Emails", "de": "Benachrichtigungs-Emails"}, + "notifyOnWarning": {"en": "Notify on Warning", "de": "Bei Warnung benachrichtigen"}, + }, +) + + +class UsageStatistics(BaseModel): + """Aggregated usage statistics for quick retrieval.""" + id: str = Field( + default_factory=lambda: str(uuid.uuid4()), description="Primary key" + ) + accountId: str = Field(..., description="Foreign key to BillingAccount") + periodType: PeriodTypeEnum = Field(..., description="Period type") + periodStart: date = Field(..., description="Period start date") + + # Aggregated values + totalCostCHF: float = Field(default=0.0, description="Total cost in CHF") + transactionCount: int = Field(default=0, description="Number of transactions") + + # Breakdown by provider + costByProvider: Dict[str, float] = Field( + default_factory=dict, + description="Cost breakdown by provider (e.g., {'anthropic': 12.50, 'openai': 8.30})" + ) + + # Breakdown by feature + costByFeature: Dict[str, float] = Field( + default_factory=dict, + description="Cost breakdown by feature (e.g., {'chatplayground': 15.00, 'automation': 5.80})" + ) + + +registerModelLabels( + "UsageStatistics", + {"en": "Usage Statistics", "de": "Nutzungsstatistik"}, + { + "id": {"en": "ID", "de": "ID"}, + "accountId": {"en": "Account ID", "de": "Konto-ID"}, + "periodType": {"en": "Period Type", "de": "Periodentyp"}, + "periodStart": {"en": "Period Start", "de": "Periodenbeginn"}, + "totalCostCHF": {"en": "Total Cost (CHF)", "de": "Gesamtkosten (CHF)"}, + "transactionCount": {"en": "Transaction Count", "de": "Anzahl Transaktionen"}, + "costByProvider": {"en": "Cost by Provider", "de": "Kosten nach Anbieter"}, + "costByFeature": {"en": "Cost by Feature", "de": "Kosten nach Feature"}, + }, +) + + +# ============================================================================ +# Response Models for API +# ============================================================================ + +class BillingBalanceResponse(BaseModel): + """Response model for balance endpoint.""" + mandateId: str + mandateName: str + billingModel: BillingModelEnum + balance: float + currency: str = "CHF" + warningThreshold: float + isWarning: bool + creditLimit: Optional[float] = None + + +class BillingStatisticsChartData(BaseModel): + """Chart data point for statistics.""" + label: str + totalCost: float + byProvider: Dict[str, float] + + +class BillingStatisticsResponse(BaseModel): + """Response model for statistics endpoint.""" + mandateId: str + period: PeriodTypeEnum + year: int + month: Optional[int] = None + currency: str = "CHF" + data: List[BillingStatisticsChartData] + totals: Dict[str, Any] + + +class BillingCheckResult(BaseModel): + """Result of a billing balance check.""" + allowed: bool + reason: Optional[str] = None + currentBalance: Optional[float] = None + requiredAmount: Optional[float] = None + billingModel: Optional[BillingModelEnum] = None diff --git a/modules/features/automation/datamodelFeatureAutomation.py b/modules/features/automation/datamodelFeatureAutomation.py index 6d1e906f..b6b32c22 100644 --- a/modules/features/automation/datamodelFeatureAutomation.py +++ b/modules/features/automation/datamodelFeatureAutomation.py @@ -25,6 +25,7 @@ class AutomationDefinition(BaseModel): eventId: Optional[str] = Field(None, description="Event ID from event management (None if not registered)", json_schema_extra={"frontend_type": "text", "frontend_readonly": True, "frontend_required": False}) status: Optional[str] = Field(None, description="Status: 'active' if event is registered, 'inactive' if not (computed, readonly)", json_schema_extra={"frontend_type": "text", "frontend_readonly": True, "frontend_required": False}) executionLogs: List[Dict[str, Any]] = Field(default_factory=list, description="List of execution logs, each containing timestamp, workflowId, status, and messages", json_schema_extra={"frontend_type": "text", "frontend_readonly": True, "frontend_required": False}) + allowedProviders: List[str] = Field(default_factory=list, description="List of allowed AICore providers (e.g., 'anthropic', 'openai'). Empty means all RBAC-permitted providers are allowed.", json_schema_extra={"frontend_type": "multiselect", "frontend_readonly": False, "frontend_required": False}) registerModelLabels( @@ -42,6 +43,7 @@ registerModelLabels( "eventId": {"en": "Event ID", "ge": "Event-ID", "fr": "ID de l'événement"}, "status": {"en": "Status", "ge": "Status", "fr": "Statut"}, "executionLogs": {"en": "Execution Logs", "ge": "Ausführungsprotokolle", "fr": "Journaux d'exécution"}, + "allowedProviders": {"en": "Allowed Providers", "ge": "Erlaubte Provider", "fr": "Fournisseurs autorisés"}, }, ) diff --git a/modules/interfaces/interfaceBootstrap.py b/modules/interfaces/interfaceBootstrap.py index 0b630f85..d6f0f063 100644 --- a/modules/interfaces/interfaceBootstrap.py +++ b/modules/interfaces/interfaceBootstrap.py @@ -76,6 +76,10 @@ def initBootstrap(db: DatabaseConnector) -> None: # Initialize feature instances for root mandate if mandateId: initRootMandateFeatures(db, mandateId) + + # Initialize billing settings for root mandate + if mandateId: + initRootMandateBilling(mandateId) def initAutomationTemplates(dbApp: DatabaseConnector, adminUserId: Optional[str] = None) -> None: @@ -1192,6 +1196,115 @@ def _createResourceContextRules(db: DatabaseConnector) -> None: db.recordCreate(AccessRule, rule) logger.info(f"Created {len(resourceRules)} RESOURCE context rules") + + # Create AICore provider RBAC rules + _createAicoreProviderRules(db) + + +def _createAicoreProviderRules(db: DatabaseConnector) -> None: + """ + Create RBAC rules for AICore providers (resource.aicore.{provider}). + All roles get access to all providers by default. + + NOTE: Provider list is dynamically discovered from AICore model registry. + + Args: + db: Database connector instance + """ + try: + from modules.aicore.aicoreModelRegistry import modelRegistry + + # Discover available connectors dynamically + connectors = modelRegistry.discoverConnectors() + providers = [c.getConnectorType() for c in connectors] + + if not providers: + logger.warning("No AICore providers discovered, skipping provider RBAC rules") + return + + logger.info(f"Creating RBAC rules for AICore providers: {providers}") + + providerRules = [] + + # All roles get access to all providers (as per requirement) + for roleLabel in ["admin", "user", "viewer"]: + roleId = _getRoleId(db, roleLabel) + if not roleId: + continue + + for provider in providers: + resourceKey = f"resource.aicore.{provider}" + + # Check if rule already exists + existingRules = db.getRecordset( + AccessRule, + recordFilter={ + "roleId": roleId, + "context": AccessRuleContext.RESOURCE.value, + "item": resourceKey + } + ) + + if not existingRules: + providerRules.append(AccessRule( + roleId=roleId, + context=AccessRuleContext.RESOURCE, + item=resourceKey, + view=True, # view=True means "can use" for RESOURCE context + read=None, + create=None, + update=None, + delete=None, + )) + + for rule in providerRules: + db.recordCreate(AccessRule, rule) + + if providerRules: + logger.info(f"Created {len(providerRules)} AICore provider RBAC rules") + else: + logger.debug("All AICore provider RBAC rules already exist") + + except Exception as e: + logger.warning(f"Failed to create AICore provider RBAC rules: {e}") + + +def initRootMandateBilling(mandateId: str) -> None: + """ + Initialize billing settings for root mandate. + Root mandate uses PREPAY_USER model with 10 CHF initial credit per user. + + Args: + mandateId: Root mandate ID + """ + try: + from modules.interfaces.interfaceDbBilling import _getRootInterface + from modules.datamodels.datamodelBilling import BillingSettings, BillingModelEnum + + billingInterface = _getRootInterface() + + # Check if settings already exist + existingSettings = billingInterface.getSettings(mandateId) + if existingSettings: + logger.info("Billing settings for root mandate already exist") + return + + # Create billing settings for root mandate + settings = BillingSettings( + mandateId=mandateId, + billingModel=BillingModelEnum.PREPAY_USER, + defaultUserCredit=10.0, # 10 CHF initial credit per user + warningThresholdPercent=10.0, + blockOnZeroBalance=True, + notifyOnWarning=True + ) + + billingInterface.createSettings(settings) + logger.info(f"Created billing settings for root mandate: PREPAY_USER with 10 CHF default credit") + + except Exception as e: + # Don't fail bootstrap if billing init fails + logger.warning(f"Failed to initialize root mandate billing (non-critical): {e}") def assignInitialUserMemberships( diff --git a/modules/interfaces/interfaceDbBilling.py b/modules/interfaces/interfaceDbBilling.py new file mode 100644 index 00000000..ebbe3ae5 --- /dev/null +++ b/modules/interfaces/interfaceDbBilling.py @@ -0,0 +1,734 @@ +# Copyright (c) 2025 Patrick Motsch +# All rights reserved. +""" +Interface for Billing operations. +Manages billing accounts, transactions, and usage statistics. + +All billing data is stored in the poweron_billing database. +""" + +import logging +from typing import Dict, Any, List, Optional +from datetime import date, datetime, timedelta +import uuid + +from modules.connectors.connectorDbPostgre import DatabaseConnector +from modules.shared.configuration import APP_CONFIG +from modules.shared.timeUtils import getUtcTimestamp +from modules.datamodels.datamodelUam import User +from modules.datamodels.datamodelBilling import ( + BillingAccount, + BillingTransaction, + BillingSettings, + UsageStatistics, + BillingAddress, + BillingModelEnum, + AccountTypeEnum, + TransactionTypeEnum, + ReferenceTypeEnum, + PeriodTypeEnum, + BillingBalanceResponse, + BillingCheckResult, +) + +logger = logging.getLogger(__name__) + +# Singleton factory for BillingObjects instances +_billingInterfaces: Dict[str, "BillingObjects"] = {} + +# Database name for billing +BILLING_DATABASE = "poweron_billing" + + +def getInterface(currentUser: User, mandateId: str = None) -> "BillingObjects": + """ + Factory function to get or create a BillingObjects instance. + + Args: + currentUser: Current user object + mandateId: Mandate ID for context + + Returns: + BillingObjects instance + """ + cacheKey = f"{currentUser.id}_{mandateId}" + + if cacheKey not in _billingInterfaces: + _billingInterfaces[cacheKey] = BillingObjects(currentUser, mandateId) + else: + _billingInterfaces[cacheKey].setUserContext(currentUser, mandateId) + + return _billingInterfaces[cacheKey] + + +def _getRootInterface() -> "BillingObjects": + """Get interface with system access for bootstrap operations.""" + from modules.security.rootAccess import getRootUser + rootUser = getRootUser() + return BillingObjects(rootUser, mandateId=None) + + +class BillingObjects: + """ + Interface for billing operations. + Manages accounts, transactions, settings, and statistics. + """ + + def __init__(self, currentUser: Optional[User] = None, mandateId: str = None): + """ + Initialize the billing interface. + + Args: + currentUser: Current user object + mandateId: Mandate ID for context + """ + self.currentUser = currentUser + self.userId = currentUser.id if currentUser else None + self.mandateId = mandateId + + # Initialize database connection + self._initializeDatabase() + + def _initializeDatabase(self): + """Initialize database connection.""" + self.db = DatabaseConnector( + databaseName=BILLING_DATABASE, + host=APP_CONFIG.get('Database_Host', 'localhost'), + port=int(APP_CONFIG.get('Database_Port', '5432')), + user=APP_CONFIG.get('Database_User', 'admin'), + password=APP_CONFIG.get('Database_Password', 'admin') + ) + + def setUserContext(self, currentUser: User, mandateId: str = None): + """ + Update user context. + + Args: + currentUser: Current user object + mandateId: Mandate ID for context + """ + self.currentUser = currentUser + self.userId = currentUser.id if currentUser else None + self.mandateId = mandateId + + # ========================================================================= + # BillingSettings Operations + # ========================================================================= + + def getSettings(self, mandateId: str) -> Optional[Dict[str, Any]]: + """ + Get billing settings for a mandate. + + Args: + mandateId: Mandate ID + + Returns: + BillingSettings dict or None if not found + """ + try: + results = self.db.getRecordset( + BillingSettings, + filterDict={"mandateId": mandateId} + ) + return results[0] if results else None + except Exception as e: + logger.error(f"Error getting billing settings: {e}") + return None + + def createSettings(self, settings: BillingSettings) -> Dict[str, Any]: + """ + Create billing settings for a mandate. + + Args: + settings: BillingSettings object + + Returns: + Created settings dict + """ + settingsDict = settings.model_dump(exclude_none=True) + + # Handle nested BillingAddress + if settings.billingAddress: + settingsDict["billingAddress"] = settings.billingAddress.model_dump() + + return self.db.recordCreate(BillingSettings, settingsDict) + + def updateSettings(self, settingsId: str, updates: Dict[str, Any]) -> Optional[Dict[str, Any]]: + """ + Update billing settings. + + Args: + settingsId: Settings ID + updates: Fields to update + + Returns: + Updated settings dict or None + """ + return self.db.recordModify(BillingSettings, settingsId, updates) + + def getOrCreateSettings(self, mandateId: str, defaultModel: BillingModelEnum = BillingModelEnum.UNLIMITED) -> Dict[str, Any]: + """ + Get or create billing settings for a mandate. + + Args: + mandateId: Mandate ID + defaultModel: Default billing model if creating + + Returns: + BillingSettings dict + """ + existing = self.getSettings(mandateId) + if existing: + return existing + + settings = BillingSettings( + mandateId=mandateId, + billingModel=defaultModel, + defaultUserCredit=10.0, + warningThresholdPercent=10.0, + blockOnZeroBalance=True, + notifyOnWarning=True + ) + return self.createSettings(settings) + + # ========================================================================= + # BillingAccount Operations + # ========================================================================= + + def getAccount(self, accountId: str) -> Optional[Dict[str, Any]]: + """Get a billing account by ID.""" + try: + results = self.db.getRecordset( + BillingAccount, + filterDict={"id": accountId} + ) + return results[0] if results else None + except Exception as e: + logger.error(f"Error getting billing account: {e}") + return None + + def getMandateAccount(self, mandateId: str) -> Optional[Dict[str, Any]]: + """ + Get the mandate-level billing account. + + Args: + mandateId: Mandate ID + + Returns: + BillingAccount dict or None + """ + try: + results = self.db.getRecordset( + BillingAccount, + filterDict={ + "mandateId": mandateId, + "accountType": AccountTypeEnum.MANDATE.value + } + ) + return results[0] if results else None + except Exception as e: + logger.error(f"Error getting mandate account: {e}") + return None + + def getUserAccount(self, mandateId: str, userId: str) -> Optional[Dict[str, Any]]: + """ + Get a user-level billing account within a mandate. + + Args: + mandateId: Mandate ID + userId: User ID + + Returns: + BillingAccount dict or None + """ + try: + results = self.db.getRecordset( + BillingAccount, + filterDict={ + "mandateId": mandateId, + "userId": userId, + "accountType": AccountTypeEnum.USER.value + } + ) + return results[0] if results else None + except Exception as e: + logger.error(f"Error getting user account: {e}") + return None + + def createAccount(self, account: BillingAccount) -> Dict[str, Any]: + """ + Create a new billing account. + + Args: + account: BillingAccount object + + Returns: + Created account dict + """ + accountDict = account.model_dump(exclude_none=True) + return self.db.recordCreate(BillingAccount, accountDict) + + def updateAccountBalance(self, accountId: str, newBalance: float) -> Optional[Dict[str, Any]]: + """ + Update account balance atomically. + + Args: + accountId: Account ID + newBalance: New balance value + + Returns: + Updated account dict or None + """ + return self.db.recordModify(BillingAccount, accountId, {"balance": newBalance}) + + def getOrCreateMandateAccount(self, mandateId: str, initialBalance: float = 0.0) -> Dict[str, Any]: + """ + Get or create a mandate-level billing account. + + Args: + mandateId: Mandate ID + initialBalance: Initial balance if creating + + Returns: + BillingAccount dict + """ + existing = self.getMandateAccount(mandateId) + if existing: + return existing + + account = BillingAccount( + mandateId=mandateId, + accountType=AccountTypeEnum.MANDATE, + balance=initialBalance, + enabled=True + ) + return self.createAccount(account) + + def getOrCreateUserAccount(self, mandateId: str, userId: str, initialBalance: float = 0.0) -> Dict[str, Any]: + """ + Get or create a user-level billing account. + + Args: + mandateId: Mandate ID + userId: User ID + initialBalance: Initial balance if creating + + Returns: + BillingAccount dict + """ + existing = self.getUserAccount(mandateId, userId) + if existing: + return existing + + account = BillingAccount( + mandateId=mandateId, + userId=userId, + accountType=AccountTypeEnum.USER, + balance=initialBalance, + enabled=True + ) + created = self.createAccount(account) + + # If initial balance > 0, create a SYSTEM credit transaction + if initialBalance > 0: + self.createTransaction(BillingTransaction( + accountId=created["id"], + transactionType=TransactionTypeEnum.CREDIT, + amount=initialBalance, + description="Initial credit for new user", + referenceType=ReferenceTypeEnum.SYSTEM + )) + + return created + + # ========================================================================= + # BillingTransaction Operations + # ========================================================================= + + def createTransaction(self, transaction: BillingTransaction) -> Dict[str, Any]: + """ + Create a new billing transaction and update account balance. + + Args: + transaction: BillingTransaction object + + Returns: + Created transaction dict + """ + # Get current account + account = self.getAccount(transaction.accountId) + if not account: + raise ValueError(f"Account {transaction.accountId} not found") + + currentBalance = account.get("balance", 0.0) + + # Calculate new balance + if transaction.transactionType == TransactionTypeEnum.CREDIT: + newBalance = currentBalance + transaction.amount + elif transaction.transactionType == TransactionTypeEnum.DEBIT: + newBalance = currentBalance - transaction.amount + else: # ADJUSTMENT + newBalance = currentBalance + transaction.amount # Can be positive or negative + + # Create transaction + transactionDict = transaction.model_dump(exclude_none=True) + created = self.db.recordCreate(BillingTransaction, transactionDict) + + # Update account balance + self.updateAccountBalance(transaction.accountId, newBalance) + + logger.info(f"Billing transaction created: {transaction.transactionType.value} {transaction.amount} CHF, " + f"balance: {currentBalance} -> {newBalance}") + + return created + + def getTransactions( + self, + accountId: str, + limit: int = 100, + offset: int = 0, + startDate: date = None, + endDate: date = None + ) -> List[Dict[str, Any]]: + """ + Get transactions for an account. + + Args: + accountId: Account ID + limit: Maximum number of results + offset: Offset for pagination + startDate: Filter by start date + endDate: Filter by end date + + Returns: + List of transaction dicts + """ + try: + filterDict = {"accountId": accountId} + results = self.db.getRecordset(BillingTransaction, filterDict=filterDict) + + # Apply date filters if provided + if startDate or endDate: + filtered = [] + for t in results: + createdAt = t.get("_createdAt") + if createdAt: + tDate = createdAt.date() if isinstance(createdAt, datetime) else createdAt + if startDate and tDate < startDate: + continue + if endDate and tDate > endDate: + continue + filtered.append(t) + results = filtered + + # Sort by creation date descending + results.sort(key=lambda x: x.get("_createdAt", ""), reverse=True) + + # Apply pagination + return results[offset:offset + limit] + except Exception as e: + logger.error(f"Error getting transactions: {e}") + return [] + + def getTransactionsByMandate(self, mandateId: str, limit: int = 100) -> List[Dict[str, Any]]: + """ + Get all transactions for a mandate (across all accounts). + + Args: + mandateId: Mandate ID + limit: Maximum number of results + + Returns: + List of transaction dicts + """ + # Get all accounts for mandate + accounts = self.db.getRecordset(BillingAccount, filterDict={"mandateId": mandateId}) + + allTransactions = [] + for account in accounts: + transactions = self.getTransactions(account["id"], limit=limit) + allTransactions.extend(transactions) + + # Sort by creation date descending and limit + allTransactions.sort(key=lambda x: x.get("_createdAt", ""), reverse=True) + return allTransactions[:limit] + + # ========================================================================= + # Balance Check Operations + # ========================================================================= + + def checkBalance(self, mandateId: str, userId: str, estimatedCost: float) -> BillingCheckResult: + """ + Check if there's sufficient balance for an operation. + + Args: + mandateId: Mandate ID + userId: User ID + estimatedCost: Estimated cost of the operation + + Returns: + BillingCheckResult + """ + settings = self.getSettings(mandateId) + if not settings: + # No settings = no billing = allowed + return BillingCheckResult(allowed=True, billingModel=BillingModelEnum.UNLIMITED) + + billingModel = BillingModelEnum(settings.get("billingModel", BillingModelEnum.UNLIMITED.value)) + + # UNLIMITED = always allowed + if billingModel == BillingModelEnum.UNLIMITED: + return BillingCheckResult(allowed=True, billingModel=billingModel) + + # Get the relevant account + if billingModel == BillingModelEnum.PREPAY_USER: + account = self.getUserAccount(mandateId, userId) + else: + account = self.getMandateAccount(mandateId) + + if not account: + # No account = no balance = potentially blocked + if settings.get("blockOnZeroBalance", True): + return BillingCheckResult( + allowed=False, + reason="NO_ACCOUNT", + currentBalance=0.0, + requiredAmount=estimatedCost, + billingModel=billingModel + ) + return BillingCheckResult(allowed=True, currentBalance=0.0, billingModel=billingModel) + + currentBalance = account.get("balance", 0.0) + + # CREDIT_POSTPAY with credit limit check + if billingModel == BillingModelEnum.CREDIT_POSTPAY: + creditLimit = account.get("creditLimit") + if creditLimit and abs(currentBalance) + estimatedCost > creditLimit: + return BillingCheckResult( + allowed=False, + reason="CREDIT_LIMIT_EXCEEDED", + currentBalance=currentBalance, + requiredAmount=estimatedCost, + billingModel=billingModel + ) + return BillingCheckResult(allowed=True, currentBalance=currentBalance, billingModel=billingModel) + + # PREPAY models - check balance + if currentBalance < estimatedCost: + if settings.get("blockOnZeroBalance", True): + return BillingCheckResult( + allowed=False, + reason="INSUFFICIENT_BALANCE", + currentBalance=currentBalance, + requiredAmount=estimatedCost, + billingModel=billingModel + ) + + return BillingCheckResult(allowed=True, currentBalance=currentBalance, billingModel=billingModel) + + def recordUsage( + self, + mandateId: str, + userId: str, + priceCHF: float, + workflowId: str = None, + featureInstanceId: str = None, + featureCode: str = None, + aicoreProvider: str = None, + description: str = "AI Usage" + ) -> Optional[Dict[str, Any]]: + """ + Record usage cost as a billing transaction. + + Args: + mandateId: Mandate ID + userId: User ID + priceCHF: Cost in CHF + workflowId: Optional workflow ID + featureInstanceId: Optional feature instance ID + featureCode: Optional feature code + aicoreProvider: Optional AICore provider name + description: Transaction description + + Returns: + Created transaction dict or None + """ + if priceCHF <= 0: + return None + + settings = self.getSettings(mandateId) + if not settings: + logger.debug(f"No billing settings for mandate {mandateId}, skipping usage recording") + return None + + billingModel = BillingModelEnum(settings.get("billingModel", BillingModelEnum.UNLIMITED.value)) + + # UNLIMITED = no transaction recording + if billingModel == BillingModelEnum.UNLIMITED: + return None + + # Get or create the relevant account + if billingModel == BillingModelEnum.PREPAY_USER: + account = self.getOrCreateUserAccount(mandateId, userId) + else: + account = self.getOrCreateMandateAccount(mandateId) + + # Create debit transaction + transaction = BillingTransaction( + accountId=account["id"], + transactionType=TransactionTypeEnum.DEBIT, + amount=priceCHF, + description=description, + referenceType=ReferenceTypeEnum.WORKFLOW, + workflowId=workflowId, + featureInstanceId=featureInstanceId, + featureCode=featureCode, + aicoreProvider=aicoreProvider + ) + + return self.createTransaction(transaction) + + # ========================================================================= + # Statistics Operations + # ========================================================================= + + def getUsageStatistics( + self, + accountId: str, + periodType: PeriodTypeEnum, + year: int, + month: int = None + ) -> List[Dict[str, Any]]: + """ + Get usage statistics for an account. + + Args: + accountId: Account ID + periodType: Period type (DAY, MONTH, YEAR) + year: Year + month: Month (for DAY period type) + + Returns: + List of statistics dicts + """ + filterDict = { + "accountId": accountId, + "periodType": periodType.value + } + + results = self.db.getRecordset(UsageStatistics, filterDict=filterDict) + + # Filter by year + filtered = [s for s in results if s.get("periodStart") and s["periodStart"].year == year] + + # Filter by month if specified + if month and periodType == PeriodTypeEnum.DAY: + filtered = [s for s in filtered if s["periodStart"].month == month] + + return sorted(filtered, key=lambda x: x.get("periodStart", date.min)) + + def calculateStatisticsFromTransactions( + self, + accountId: str, + startDate: date, + endDate: date + ) -> Dict[str, Any]: + """ + Calculate statistics from transactions for a period. + + Args: + accountId: Account ID + startDate: Start date + endDate: End date + + Returns: + Statistics dict + """ + transactions = self.getTransactions(accountId, limit=10000, startDate=startDate, endDate=endDate) + + # Filter only DEBIT transactions (usage) + debits = [t for t in transactions if t.get("transactionType") == TransactionTypeEnum.DEBIT.value] + + totalCost = sum(t.get("amount", 0) for t in debits) + + # Calculate by provider + costByProvider = {} + for t in debits: + provider = t.get("aicoreProvider", "unknown") + costByProvider[provider] = costByProvider.get(provider, 0) + t.get("amount", 0) + + # Calculate by feature + costByFeature = {} + for t in debits: + feature = t.get("featureCode", "unknown") + costByFeature[feature] = costByFeature.get(feature, 0) + t.get("amount", 0) + + return { + "totalCostCHF": totalCost, + "transactionCount": len(debits), + "costByProvider": costByProvider, + "costByFeature": costByFeature + } + + # ========================================================================= + # Utility Methods + # ========================================================================= + + def getBalancesForUser(self, userId: str) -> List[BillingBalanceResponse]: + """ + Get all billing balances for a user across mandates. + + Args: + userId: User ID + + Returns: + List of BillingBalanceResponse + """ + from modules.interfaces.interfaceDbApp import getInterface as getAppInterface + + balances = [] + + # Get all mandates the user belongs to + try: + appInterface = getAppInterface(self.currentUser) + userMandates = appInterface.getUserMandates(userId) + + for um in userMandates: + mandateId = um.get("mandateId") + mandate = appInterface.getMandate(mandateId) + if not mandate: + continue + + settings = self.getSettings(mandateId) + if not settings: + continue + + billingModel = BillingModelEnum(settings.get("billingModel", BillingModelEnum.UNLIMITED.value)) + + # Get the relevant account + if billingModel == BillingModelEnum.PREPAY_USER: + account = self.getUserAccount(mandateId, userId) + elif billingModel in [BillingModelEnum.PREPAY_MANDATE, BillingModelEnum.CREDIT_POSTPAY]: + account = self.getMandateAccount(mandateId) + else: + continue + + if not account: + continue + + balance = account.get("balance", 0.0) + warningThreshold = account.get("warningThreshold", 0.0) + + balances.append(BillingBalanceResponse( + mandateId=mandateId, + mandateName=mandate.get("name", ""), + billingModel=billingModel, + balance=balance, + warningThreshold=warningThreshold, + isWarning=balance <= warningThreshold, + creditLimit=account.get("creditLimit") + )) + except Exception as e: + logger.error(f"Error getting balances for user: {e}") + + return balances diff --git a/modules/routes/routeBilling.py b/modules/routes/routeBilling.py new file mode 100644 index 00000000..c191e793 --- /dev/null +++ b/modules/routes/routeBilling.py @@ -0,0 +1,557 @@ +# Copyright (c) 2025 Patrick Motsch +# All rights reserved. +""" +Billing routes for the backend API. +Implements the endpoints for billing management and usage tracking. + +Features: +- User endpoints: View balance, transactions, statistics +- Admin endpoints: Manage settings, add credits, view all accounts +""" + +from fastapi import APIRouter, HTTPException, Depends, Body, Path, Request, Response, Query +from typing import List, Dict, Any, Optional +from fastapi import status +import logging +from datetime import date, datetime +from pydantic import BaseModel, Field + +# Import auth module +from modules.auth import limiter, requireSysAdmin, getRequestContext, RequestContext + +# Import billing components +from modules.interfaces.interfaceDbBilling import getInterface as getBillingInterface +from modules.services.serviceBilling.mainServiceBilling import getService as getBillingService +from modules.datamodels.datamodelBilling import ( + BillingAccount, + BillingTransaction, + BillingSettings, + BillingAddress, + BillingModelEnum, + TransactionTypeEnum, + ReferenceTypeEnum, + PeriodTypeEnum, + BillingBalanceResponse, + BillingStatisticsResponse, + BillingStatisticsChartData, + BillingCheckResult, +) + +# Configure logger +logger = logging.getLogger(__name__) + +# ============================================================================= +# Request/Response Models +# ============================================================================= + +class CreditAddRequest(BaseModel): + """Request model for adding credit to an account.""" + userId: Optional[str] = Field(None, description="Target user ID (for PREPAY_USER model)") + amount: float = Field(..., gt=0, description="Amount to credit in CHF") + description: str = Field(default="Manual credit", description="Transaction description") + + +class BillingSettingsUpdate(BaseModel): + """Request model for updating billing settings.""" + billingModel: Optional[BillingModelEnum] = None + defaultUserCredit: Optional[float] = Field(None, ge=0) + warningThresholdPercent: Optional[float] = Field(None, ge=0, le=100) + blockOnZeroBalance: Optional[bool] = None + notifyOnWarning: Optional[bool] = None + notifyEmails: Optional[List[str]] = None + billingAddress: Optional[BillingAddress] = None + + +class TransactionResponse(BaseModel): + """Response model for a billing transaction.""" + id: str + accountId: str + transactionType: TransactionTypeEnum + amount: float + description: str + referenceType: Optional[ReferenceTypeEnum] + workflowId: Optional[str] + featureCode: Optional[str] + aicoreProvider: Optional[str] + createdAt: Optional[datetime] + + +class AccountSummary(BaseModel): + """Summary of a billing account.""" + id: str + mandateId: str + userId: Optional[str] + accountType: str + balance: float + creditLimit: Optional[float] + warningThreshold: float + enabled: bool + + +class UsageReportResponse(BaseModel): + """Usage report for a period.""" + period: str + totalCost: float + transactionCount: int + costByProvider: Dict[str, float] + costByFeature: Dict[str, float] + + +# ============================================================================= +# Router Setup +# ============================================================================= + +router = APIRouter( + prefix="/api/billing", + tags=["Billing"], + responses={404: {"description": "Not found"}} +) + +# ============================================================================= +# User Endpoints +# ============================================================================= + +@router.get("/balance", response_model=List[BillingBalanceResponse]) +@limiter.limit("60/minute") +async def getBalance( + request: Request, + ctx: RequestContext = Depends(getRequestContext) +): + """ + Get billing balances for all mandates the current user belongs to. + Returns balance information for each mandate. + """ + try: + billingService = getBillingService( + ctx.currentUser, + ctx.mandateId, + featureCode="billing" + ) + + balances = billingService.getBalancesForUser() + return balances + + except Exception as e: + logger.error(f"Error getting billing balance: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + +@router.get("/balance/{mandateId}", response_model=BillingBalanceResponse) +@limiter.limit("60/minute") +async def getBalanceForMandate( + request: Request, + mandateId: str = Path(..., description="Mandate ID"), + ctx: RequestContext = Depends(getRequestContext) +): + """ + Get billing balance for a specific mandate. + """ + try: + billingService = getBillingService( + ctx.currentUser, + mandateId, + featureCode="billing" + ) + + # Check balance + checkResult = billingService.checkBalance(0.0) + + # Get mandate name from app interface + from modules.interfaces.interfaceDbApp import getInterface as getAppInterface + appInterface = getAppInterface(ctx.currentUser, mandateId=mandateId) + mandate = appInterface.getMandate(mandateId) + mandateName = mandate.get("name", "") if mandate else "" + + return BillingBalanceResponse( + mandateId=mandateId, + mandateName=mandateName, + billingModel=checkResult.billingModel or BillingModelEnum.UNLIMITED, + balance=checkResult.currentBalance or 0.0, + warningThreshold=0.0, # TODO: Get from account + isWarning=False, + creditLimit=None + ) + + except Exception as e: + logger.error(f"Error getting billing balance for mandate {mandateId}: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + +@router.get("/transactions", response_model=List[TransactionResponse]) +@limiter.limit("30/minute") +async def getTransactions( + request: Request, + limit: int = Query(default=50, ge=1, le=500), + offset: int = Query(default=0, ge=0), + ctx: RequestContext = Depends(getRequestContext) +): + """ + Get transaction history for the current mandate. + """ + try: + billingService = getBillingService( + ctx.currentUser, + ctx.mandateId, + featureCode="billing" + ) + + transactions = billingService.getTransactionHistory(limit=limit) + + # Convert to response model + result = [] + for t in transactions[offset:offset + limit]: + result.append(TransactionResponse( + id=t.get("id"), + accountId=t.get("accountId"), + transactionType=TransactionTypeEnum(t.get("transactionType", "DEBIT")), + amount=t.get("amount", 0.0), + description=t.get("description", ""), + referenceType=ReferenceTypeEnum(t["referenceType"]) if t.get("referenceType") else None, + workflowId=t.get("workflowId"), + featureCode=t.get("featureCode"), + aicoreProvider=t.get("aicoreProvider"), + createdAt=t.get("_createdAt") + )) + + return result + + except Exception as e: + logger.error(f"Error getting billing transactions: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + +@router.get("/statistics/{period}", response_model=UsageReportResponse) +@limiter.limit("30/minute") +async def getStatistics( + request: Request, + period: str = Path(..., description="Period: 'day', 'month', or 'year'"), + year: int = Query(..., description="Year"), + month: Optional[int] = Query(None, description="Month (1-12, required for 'day' period)"), + ctx: RequestContext = Depends(getRequestContext) +): + """ + Get usage statistics for a period. + """ + try: + # Validate period + if period not in ["day", "month", "year"]: + raise HTTPException(status_code=400, detail="Invalid period. Use 'day', 'month', or 'year'") + + if period == "day" and not month: + raise HTTPException(status_code=400, detail="Month is required for 'day' period") + + billingInterface = getBillingInterface(ctx.currentUser, ctx.mandateId) + settings = billingInterface.getSettings(ctx.mandateId) + + if not settings: + return UsageReportResponse( + period=period, + totalCost=0.0, + transactionCount=0, + costByProvider={}, + costByFeature={} + ) + + billingModel = BillingModelEnum(settings.get("billingModel", BillingModelEnum.UNLIMITED.value)) + + # Get the relevant account + if billingModel == BillingModelEnum.PREPAY_USER: + account = billingInterface.getUserAccount(ctx.mandateId, ctx.currentUser.id) + else: + account = billingInterface.getMandateAccount(ctx.mandateId) + + if not account: + return UsageReportResponse( + period=period, + totalCost=0.0, + transactionCount=0, + costByProvider={}, + costByFeature={} + ) + + # Calculate date range + if period == "day": + startDate = date(year, month, 1) + if month == 12: + endDate = date(year + 1, 1, 1) + else: + endDate = date(year, month + 1, 1) + elif period == "month": + startDate = date(year, 1, 1) + endDate = date(year + 1, 1, 1) + else: # year + startDate = date(year, 1, 1) + endDate = date(year + 1, 1, 1) + + # Get statistics from transactions + stats = billingInterface.calculateStatisticsFromTransactions( + account["id"], + startDate, + endDate + ) + + return UsageReportResponse( + period=period, + totalCost=stats.get("totalCostCHF", 0.0), + transactionCount=stats.get("transactionCount", 0), + costByProvider=stats.get("costByProvider", {}), + costByFeature=stats.get("costByFeature", {}) + ) + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error getting billing statistics: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + +@router.get("/providers", response_model=List[str]) +@limiter.limit("60/minute") +async def getAllowedProviders( + request: Request, + ctx: RequestContext = Depends(getRequestContext) +): + """ + Get list of AICore providers the current user is allowed to use. + """ + try: + billingService = getBillingService( + ctx.currentUser, + ctx.mandateId, + featureCode="billing" + ) + + return billingService.getallowedProviders() + + except Exception as e: + logger.error(f"Error getting allowed providers: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + +# ============================================================================= +# Admin Endpoints +# ============================================================================= + +@router.get("/admin/settings/{mandateId}", response_model=Dict[str, Any]) +@limiter.limit("30/minute") +@requireSysAdmin +async def getSettingsAdmin( + request: Request, + mandateId: str = Path(..., description="Mandate ID"), + ctx: RequestContext = Depends(getRequestContext) +): + """ + Get billing settings for a mandate (SysAdmin only). + """ + try: + billingInterface = getBillingInterface(ctx.currentUser, mandateId) + settings = billingInterface.getSettings(mandateId) + + if not settings: + raise HTTPException(status_code=404, detail="Billing settings not found") + + return settings + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error getting billing settings: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + +@router.post("/admin/settings/{mandateId}", response_model=Dict[str, Any]) +@limiter.limit("10/minute") +@requireSysAdmin +async def createOrUpdateSettings( + request: Request, + mandateId: str = Path(..., description="Mandate ID"), + settingsUpdate: BillingSettingsUpdate = Body(...), + ctx: RequestContext = Depends(getRequestContext) +): + """ + Create or update billing settings for a mandate (SysAdmin only). + """ + try: + billingInterface = getBillingInterface(ctx.currentUser, mandateId) + existingSettings = billingInterface.getSettings(mandateId) + + if existingSettings: + # Update existing settings + updates = settingsUpdate.model_dump(exclude_none=True) + if updates: + result = billingInterface.updateSettings(existingSettings["id"], updates) + return result or existingSettings + return existingSettings + else: + # Create new settings + from modules.datamodels.datamodelBilling import BillingSettings + + newSettings = BillingSettings( + mandateId=mandateId, + billingModel=settingsUpdate.billingModel or BillingModelEnum.UNLIMITED, + defaultUserCredit=settingsUpdate.defaultUserCredit or 10.0, + warningThresholdPercent=settingsUpdate.warningThresholdPercent or 10.0, + blockOnZeroBalance=settingsUpdate.blockOnZeroBalance if settingsUpdate.blockOnZeroBalance is not None else True, + notifyOnWarning=settingsUpdate.notifyOnWarning if settingsUpdate.notifyOnWarning is not None else True, + notifyEmails=settingsUpdate.notifyEmails or [], + billingAddress=settingsUpdate.billingAddress + ) + + return billingInterface.createSettings(newSettings) + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error updating billing settings: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + +@router.post("/admin/credit/{mandateId}", response_model=Dict[str, Any]) +@limiter.limit("10/minute") +@requireSysAdmin +async def addCredit( + request: Request, + mandateId: str = Path(..., description="Mandate ID"), + creditRequest: CreditAddRequest = Body(...), + ctx: RequestContext = Depends(getRequestContext) +): + """ + Add credit to a billing account (SysAdmin only). + For PREPAY_USER model, specify userId. For PREPAY_MANDATE, leave userId empty. + """ + try: + # Get settings to determine billing model + billingInterface = getBillingInterface(ctx.currentUser, mandateId) + settings = billingInterface.getSettings(mandateId) + + if not settings: + raise HTTPException(status_code=404, detail="Billing settings not found for this mandate") + + billingModel = BillingModelEnum(settings.get("billingModel", BillingModelEnum.UNLIMITED.value)) + + # Validate request based on billing model + if billingModel == BillingModelEnum.PREPAY_USER: + if not creditRequest.userId: + raise HTTPException(status_code=400, detail="userId is required for PREPAY_USER model") + + # Create user-level account if needed and add credit + account = billingInterface.getOrCreateUserAccount( + mandateId, + creditRequest.userId, + initialBalance=0.0 + ) + elif billingModel in [BillingModelEnum.PREPAY_MANDATE, BillingModelEnum.CREDIT_POSTPAY]: + # Create mandate-level account if needed and add credit + account = billingInterface.getOrCreateMandateAccount(mandateId, initialBalance=0.0) + else: + raise HTTPException(status_code=400, detail=f"Cannot add credit to {billingModel.value} billing model") + + # Create credit transaction + from modules.datamodels.datamodelBilling import BillingTransaction + + transaction = BillingTransaction( + accountId=account["id"], + transactionType=TransactionTypeEnum.CREDIT, + amount=creditRequest.amount, + description=creditRequest.description, + referenceType=ReferenceTypeEnum.ADMIN + ) + + result = billingInterface.createTransaction(transaction) + + logger.info(f"Added {creditRequest.amount} CHF credit to account {account['id']} in mandate {mandateId}") + + return result + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error adding credit: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + +@router.get("/admin/accounts/{mandateId}", response_model=List[AccountSummary]) +@limiter.limit("30/minute") +@requireSysAdmin +async def getAccounts( + request: Request, + mandateId: str = Path(..., description="Mandate ID"), + ctx: RequestContext = Depends(getRequestContext) +): + """ + Get all billing accounts for a mandate (SysAdmin only). + """ + try: + billingInterface = getBillingInterface(ctx.currentUser, mandateId) + + # Get all accounts for this mandate + from modules.connectors.connectorDbPostgre import DatabaseConnector + from modules.shared.configuration import APP_CONFIG + from modules.datamodels.datamodelBilling import BillingAccount + + db = DatabaseConnector( + databaseName="poweron_billing", + host=APP_CONFIG.get('Database_Host', 'localhost'), + port=int(APP_CONFIG.get('Database_Port', '5432')), + user=APP_CONFIG.get('Database_User', 'admin'), + password=APP_CONFIG.get('Database_Password', 'admin') + ) + + accounts = db.getRecordset(BillingAccount, filterDict={"mandateId": mandateId}) + + result = [] + for acc in accounts: + result.append(AccountSummary( + id=acc.get("id"), + mandateId=acc.get("mandateId"), + userId=acc.get("userId"), + accountType=acc.get("accountType"), + balance=acc.get("balance", 0.0), + creditLimit=acc.get("creditLimit"), + warningThreshold=acc.get("warningThreshold", 0.0), + enabled=acc.get("enabled", True) + )) + + return result + + except Exception as e: + logger.error(f"Error getting billing accounts: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + +@router.get("/admin/transactions/{mandateId}", response_model=List[TransactionResponse]) +@limiter.limit("30/minute") +@requireSysAdmin +async def getTransactionsAdmin( + request: Request, + mandateId: str = Path(..., description="Mandate ID"), + limit: int = Query(default=100, ge=1, le=1000), + ctx: RequestContext = Depends(getRequestContext) +): + """ + Get all transactions for a mandate (SysAdmin only). + """ + try: + billingInterface = getBillingInterface(ctx.currentUser, mandateId) + transactions = billingInterface.getTransactionsByMandate(mandateId, limit=limit) + + result = [] + for t in transactions: + result.append(TransactionResponse( + id=t.get("id"), + accountId=t.get("accountId"), + transactionType=TransactionTypeEnum(t.get("transactionType", "DEBIT")), + amount=t.get("amount", 0.0), + description=t.get("description", ""), + referenceType=ReferenceTypeEnum(t["referenceType"]) if t.get("referenceType") else None, + workflowId=t.get("workflowId"), + featureCode=t.get("featureCode"), + aicoreProvider=t.get("aicoreProvider"), + createdAt=t.get("_createdAt") + )) + + return result + + except Exception as e: + logger.error(f"Error getting billing transactions for mandate {mandateId}: {e}") + raise HTTPException(status_code=500, detail=str(e)) diff --git a/modules/services/serviceBilling/__init__.py b/modules/services/serviceBilling/__init__.py new file mode 100644 index 00000000..ab0805d5 --- /dev/null +++ b/modules/services/serviceBilling/__init__.py @@ -0,0 +1,7 @@ +# Copyright (c) 2025 Patrick Motsch +# All rights reserved. +"""Billing service module.""" + +from .mainServiceBilling import BillingService, getService + +__all__ = ["BillingService", "getService"] diff --git a/modules/services/serviceBilling/mainServiceBilling.py b/modules/services/serviceBilling/mainServiceBilling.py new file mode 100644 index 00000000..709ab61a --- /dev/null +++ b/modules/services/serviceBilling/mainServiceBilling.py @@ -0,0 +1,408 @@ +# Copyright (c) 2025 Patrick Motsch +# All rights reserved. +""" +Billing Service - Central service for billing operations. + +Handles: +- Balance checks before AI operations +- Cost recording after AI operations +- Provider permission checks via RBAC +- Price calculation with markup +""" + +import logging +from typing import Dict, Any, List, Optional +from datetime import datetime + +from modules.datamodels.datamodelUam import User +from modules.datamodels.datamodelBilling import ( + BillingModelEnum, + BillingCheckResult, + TransactionTypeEnum, + ReferenceTypeEnum, + BillingTransaction, + BillingBalanceResponse, +) +from modules.interfaces.interfaceDbBilling import getInterface as getBillingInterface + +logger = logging.getLogger(__name__) + +# Markup percentage for internal pricing (50% = 1.5x) +BILLING_MARKUP_PERCENT = 50 + +# Singleton cache +_billingServices: Dict[str, "BillingService"] = {} + + +def getService(currentUser: User, mandateId: str, featureInstanceId: str = None, featureCode: str = None) -> "BillingService": + """ + Factory function to get or create a BillingService instance. + + Args: + currentUser: Current user object + mandateId: Mandate ID for context + featureInstanceId: Optional feature instance ID + featureCode: Optional feature code (e.g., 'chatplayground', 'automation') + + Returns: + BillingService instance + """ + cacheKey = f"{currentUser.id}_{mandateId}_{featureInstanceId}" + + if cacheKey not in _billingServices: + _billingServices[cacheKey] = BillingService(currentUser, mandateId, featureInstanceId, featureCode) + else: + _billingServices[cacheKey].setContext(currentUser, mandateId, featureInstanceId, featureCode) + + return _billingServices[cacheKey] + + +class BillingService: + """ + Central billing service for AI operations. + + Responsibilities: + - Check balance before operations + - Record usage costs + - Apply pricing markup + - Check provider permissions via RBAC + """ + + def __init__( + self, + currentUser: User, + mandateId: str, + featureInstanceId: str = None, + featureCode: str = None + ): + """ + Initialize the billing service. + + Args: + currentUser: Current user object + mandateId: Mandate ID + featureInstanceId: Optional feature instance ID + featureCode: Optional feature code + """ + self.currentUser = currentUser + self.mandateId = mandateId + self.featureInstanceId = featureInstanceId + self.featureCode = featureCode + + # Get billing interface + self._billingInterface = getBillingInterface(currentUser, mandateId) + + # Cache settings + self._settingsCache = None + + def setContext( + self, + currentUser: User, + mandateId: str, + featureInstanceId: str = None, + featureCode: str = None + ): + """Update service context.""" + self.currentUser = currentUser + self.mandateId = mandateId + self.featureInstanceId = featureInstanceId + self.featureCode = featureCode + self._billingInterface = getBillingInterface(currentUser, mandateId) + self._settingsCache = None + + def _getSettings(self) -> Optional[Dict[str, Any]]: + """Get billing settings with caching.""" + if self._settingsCache is None: + self._settingsCache = self._billingInterface.getSettings(self.mandateId) + return self._settingsCache + + # ========================================================================= + # Price Calculation + # ========================================================================= + + def calculatePriceWithMarkup(self, basePriceCHF: float) -> float: + """ + Calculate final price with markup. + + The AICore plugins return prices in their original currency (USD). + This method applies the configured markup percentage. + + Args: + basePriceCHF: Base price from AI model (actually USD from provider) + + Returns: + Final price in CHF with markup applied + """ + if basePriceCHF <= 0: + return 0.0 + + # Apply markup (50% = multiply by 1.5) + markup_multiplier = 1 + (BILLING_MARKUP_PERCENT / 100) + return round(basePriceCHF * markup_multiplier, 6) + + # ========================================================================= + # Balance Operations + # ========================================================================= + + def checkBalance(self, estimatedCost: float = 0.0) -> BillingCheckResult: + """ + Check if the current user/mandate has sufficient balance. + + Args: + estimatedCost: Estimated cost of the operation (with markup applied) + + Returns: + BillingCheckResult indicating if operation is allowed + """ + return self._billingInterface.checkBalance( + self.mandateId, + self.currentUser.id, + estimatedCost + ) + + def hasBalance(self, estimatedCost: float = 0.0) -> bool: + """ + Quick check if balance is sufficient. + + Args: + estimatedCost: Estimated cost with markup + + Returns: + True if operation is allowed + """ + result = self.checkBalance(estimatedCost) + return result.allowed + + def getCurrentBalance(self) -> float: + """ + Get current balance for the user/mandate. + + Returns: + Current balance in CHF + """ + result = self.checkBalance(0.0) + return result.currentBalance or 0.0 + + # ========================================================================= + # Usage Recording + # ========================================================================= + + def recordUsage( + self, + priceCHF: float, + workflowId: str = None, + aicoreProvider: str = None, + description: str = None + ) -> Optional[Dict[str, Any]]: + """ + Record AI usage cost as a billing transaction. + + This method: + 1. Applies the pricing markup + 2. Creates a DEBIT transaction + 3. Updates the account balance + + Args: + priceCHF: Base price from AI model (before markup) + workflowId: Optional workflow ID + aicoreProvider: AICore provider name (e.g., 'anthropic', 'openai') + description: Optional description + + Returns: + Created transaction dict or None if not recorded + """ + if priceCHF <= 0: + return None + + # Apply markup + finalPrice = self.calculatePriceWithMarkup(priceCHF) + + if finalPrice <= 0: + return None + + # Build description + if not description: + description = f"AI Usage: {aicoreProvider or 'unknown'}" + + return self._billingInterface.recordUsage( + mandateId=self.mandateId, + userId=self.currentUser.id, + priceCHF=finalPrice, + workflowId=workflowId, + featureInstanceId=self.featureInstanceId, + featureCode=self.featureCode, + aicoreProvider=aicoreProvider, + description=description + ) + + # ========================================================================= + # Provider Permission Check (via RBAC) + # ========================================================================= + + def isProviderAllowed(self, provider: str) -> bool: + """ + Check if the user has permission to use an AICore provider. + + Uses RBAC to check for resource permission: + resource.aicore.{provider} + + Args: + provider: Provider name (e.g., 'anthropic', 'openai') + + Returns: + True if provider is allowed + """ + try: + from modules.security.rbac import RbacClass + from modules.datamodels.datamodelRbac import AccessRuleContext + from modules.connectors.connectorDbPostgre import DatabaseConnector + from modules.shared.configuration import APP_CONFIG + + # Get database connectors + dbApp = DatabaseConnector( + databaseName="poweron_app", + host=APP_CONFIG.get('Database_Host', 'localhost'), + port=int(APP_CONFIG.get('Database_Port', '5432')), + user=APP_CONFIG.get('Database_User', 'admin'), + password=APP_CONFIG.get('Database_Password', 'admin') + ) + + rbac = RbacClass(dbApp, dbApp) + resourceKey = f"resource.aicore.{provider}" + + # Check if user has view permission for this resource (view = use for RESOURCE context) + permissions = rbac.getUserPermissions( + self.currentUser, + AccessRuleContext.RESOURCE, + resourceKey, + mandateId=self.mandateId + ) + + return permissions.view + except Exception as e: + logger.warning(f"Error checking provider permission: {e}") + # Default to allowed if RBAC check fails + return True + + def getallowedProviders(self) -> List[str]: + """ + Get list of AICore providers the user is allowed to use. + + Returns: + List of allowed provider names + """ + try: + from modules.aicore.aicoreModelRegistry import modelRegistry + + # Get all available providers + connectors = modelRegistry.discoverConnectors() + allProviders = [c.getConnectorType() for c in connectors] + + # Filter by RBAC permissions + return [p for p in allProviders if self.isProviderAllowed(p)] + except Exception as e: + logger.warning(f"Error getting allowed providers: {e}") + return [] + + # ========================================================================= + # Admin Operations + # ========================================================================= + + def addCredit( + self, + amount: float, + description: str = "Manual credit", + referenceType: ReferenceTypeEnum = ReferenceTypeEnum.ADMIN + ) -> Optional[Dict[str, Any]]: + """ + Add credit to the account (admin operation). + + Args: + amount: Amount to credit (positive) + description: Transaction description + referenceType: Reference type (ADMIN, PAYMENT, SYSTEM) + + Returns: + Created transaction dict or None + """ + if amount <= 0: + return None + + settings = self._getSettings() + if not settings: + logger.warning(f"No billing settings for mandate {self.mandateId}") + return None + + billingModel = BillingModelEnum(settings.get("billingModel", BillingModelEnum.UNLIMITED.value)) + + # Get or create account + if billingModel == BillingModelEnum.PREPAY_USER: + account = self._billingInterface.getOrCreateUserAccount( + self.mandateId, + self.currentUser.id, + initialBalance=0.0 + ) + else: + account = self._billingInterface.getOrCreateMandateAccount( + self.mandateId, + initialBalance=0.0 + ) + + # Create credit transaction + transaction = BillingTransaction( + accountId=account["id"], + transactionType=TransactionTypeEnum.CREDIT, + amount=amount, + description=description, + referenceType=referenceType + ) + + return self._billingInterface.createTransaction(transaction) + + # ========================================================================= + # Statistics & Reporting + # ========================================================================= + + def getBalancesForUser(self) -> List[BillingBalanceResponse]: + """ + Get all billing balances for the current user. + + Returns: + List of balance responses for each mandate + """ + return self._billingInterface.getBalancesForUser(self.currentUser.id) + + def getTransactionHistory(self, limit: int = 100) -> List[Dict[str, Any]]: + """ + Get transaction history for the current mandate. + + Args: + limit: Maximum number of transactions + + Returns: + List of transactions + """ + return self._billingInterface.getTransactionsByMandate(self.mandateId, limit=limit) + + +# ============================================================================ +# Exception Classes +# ============================================================================ + +class InsufficientBalanceException(Exception): + """Raised when there's insufficient balance for an operation.""" + + def __init__(self, currentBalance: float, requiredAmount: float, message: str = None): + self.currentBalance = currentBalance + self.requiredAmount = requiredAmount + self.message = message or f"Insufficient balance. Current: {currentBalance:.2f} CHF, Required: {requiredAmount:.2f} CHF" + super().__init__(self.message) + + +class ProviderNotAllowedException(Exception): + """Raised when a user doesn't have permission to use an AI provider.""" + + def __init__(self, provider: str, message: str = None): + self.provider = provider + self.message = message or f"Provider '{provider}' is not allowed for your role" + super().__init__(self.message) diff --git a/modules/services/serviceChat/mainServiceChat.py b/modules/services/serviceChat/mainServiceChat.py index b7910720..37b232b8 100644 --- a/modules/services/serviceChat/mainServiceChat.py +++ b/modules/services/serviceChat/mainServiceChat.py @@ -674,7 +674,8 @@ class ChatService: return chatLog def storeWorkflowStat(self, workflow: Any, aiResponse: Any, process: str) -> ChatStat: - """Persist workflow-level ChatStat from AiCallResponse and append to workflow stats list.""" + """Persist workflow-level ChatStat from AiCallResponse and append to workflow stats list. + Also records the usage cost to the billing system if configured.""" try: # Create ChatStat from AiCallResponse data statData = { @@ -696,10 +697,69 @@ class ChatService: workflow.stats = [] workflow.stats.append(stat) + # Record billing transaction if mandateId is available + self._recordBillingUsage(workflow, aiResponse, process) + return stat except Exception as e: logger.error(f"Failed to store workflow stat: {e}") raise + + def _recordBillingUsage(self, workflow: Any, aiResponse: Any, process: str) -> None: + """Record AI usage to the billing system. + + This method: + 1. Gets the mandate context from services + 2. Records the usage cost with 50% markup via BillingService + + Args: + workflow: ChatWorkflow object + aiResponse: AI call response with cost information + process: Process identifier for the AI call + """ + try: + # Check if we have mandate context + mandateId = getattr(self.services, 'mandateId', None) + if not mandateId: + logger.debug("No mandate context, skipping billing recording") + return + + # Check if there's a cost to record + priceCHF = getattr(aiResponse, 'priceCHF', 0.0) + if not priceCHF or priceCHF <= 0: + return + + # Extract provider from model name (e.g., "anthropic.claude-3-sonnet" -> "anthropic") + modelName = getattr(aiResponse, 'modelName', '') or '' + aicoreProvider = modelName.split('.')[0] if '.' in modelName else 'unknown' + + # Get feature context if available + featureInstanceId = getattr(self.services, 'featureInstanceId', None) + featureCode = getattr(self.services, 'featureCode', None) + + # Import and use BillingService + from modules.services.serviceBilling.mainServiceBilling import getService as getBillingService + + billingService = getBillingService( + self.user, + mandateId, + featureInstanceId=featureInstanceId, + featureCode=featureCode + ) + + # Record the usage (includes 50% markup) + billingService.recordUsage( + priceCHF=priceCHF, + workflowId=workflow.id, + aicoreProvider=aicoreProvider, + description=f"AI Usage: {process}" + ) + + logger.debug(f"Recorded billing usage: {priceCHF} CHF for {process} (provider: {aicoreProvider})") + + except Exception as e: + # Don't fail the main operation if billing recording fails + logger.warning(f"Failed to record billing usage (non-critical): {e}") def updateMessage(self, messageId: str, messageData: Dict[str, Any]): """Update message by delegating to the chat interface""" diff --git a/modules/system/mainSystem.py b/modules/system/mainSystem.py index 9b300d78..e80efbe6 100644 --- a/modules/system/mainSystem.py +++ b/modules/system/mainSystem.py @@ -91,6 +91,29 @@ NAVIGATION_SECTIONS = [ }, ], }, + { + "id": "billing", + "title": {"en": "BILLING", "de": "BILLING", "fr": "FACTURATION"}, + "order": 35, + "items": [ + { + "id": "billing-dashboard", + "objectKey": "ui.billing.dashboard", + "label": {"en": "Balance", "de": "Guthaben", "fr": "Solde"}, + "icon": "FaWallet", + "path": "/billing", + "order": 10, + }, + { + "id": "billing-transactions", + "objectKey": "ui.billing.transactions", + "label": {"en": "Transactions", "de": "Transaktionen", "fr": "Transactions"}, + "icon": "FaListAlt", + "path": "/billing/transactions", + "order": 20, + }, + ], + }, { "id": "admin", "title": {"en": "ADMINISTRATION", "de": "ADMINISTRATION", "fr": "ADMINISTRATION"}, @@ -178,6 +201,15 @@ NAVIGATION_SECTIONS = [ "order": 50, "adminOnly": True, }, + { + "id": "admin-billing", + "objectKey": "ui.admin.billing", + "label": {"en": "Billing Administration", "de": "Billing-Verwaltung", "fr": "Administration de facturation"}, + "icon": "FaMoneyBillAlt", + "path": "/admin/billing", + "order": 60, + "adminOnly": True, + }, ], }, ]