billing initial

This commit is contained in:
ValueOn AG 2026-02-04 21:50:55 +01:00
parent 45eda1e4d4
commit d118128813
10 changed files with 2182 additions and 1 deletions

3
app.py
View file

@ -503,6 +503,9 @@ app.include_router(userAccessOverviewRouter)
from modules.routes.routeGdpr import router as gdprRouter
app.include_router(gdprRouter)
from modules.routes.routeBilling import router as billingRouter
app.include_router(billingRouter)
# ============================================================================
# SYSTEM ROUTES (Navigation, etc.)
# ============================================================================

View file

@ -0,0 +1,265 @@
# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""Billing models: BillingAccount, BillingTransaction, BillingSettings, UsageStatistics."""
from typing import List, Dict, Any, Optional
from enum import Enum
from datetime import date, datetime
from pydantic import BaseModel, Field
from modules.shared.attributeUtils import registerModelLabels
import uuid
class BillingModelEnum(str, Enum):
"""Billing model types."""
PREPAY_MANDATE = "PREPAY_MANDATE" # Prepaid budget shared by all users in mandate
PREPAY_USER = "PREPAY_USER" # Prepaid budget per user within mandate
CREDIT_POSTPAY = "CREDIT_POSTPAY" # Credit with monthly invoice (requires billing address)
UNLIMITED = "UNLIMITED" # No cost limitation (internal mandates only)
class AccountTypeEnum(str, Enum):
"""Account type for billing accounts."""
MANDATE = "MANDATE" # Account for entire mandate
USER = "USER" # Account for specific user within mandate
class TransactionTypeEnum(str, Enum):
"""Transaction types for billing."""
CREDIT = "CREDIT" # Credit/top-up (positive)
DEBIT = "DEBIT" # Debit/usage (positive amount, reduces balance)
ADJUSTMENT = "ADJUSTMENT" # Manual adjustment by admin
class ReferenceTypeEnum(str, Enum):
"""Reference types for transactions."""
WORKFLOW = "WORKFLOW" # AI workflow usage
PAYMENT = "PAYMENT" # Payment/top-up
ADMIN = "ADMIN" # Admin adjustment
SYSTEM = "SYSTEM" # System credit (e.g., initial credit)
class PeriodTypeEnum(str, Enum):
"""Period types for usage statistics."""
DAY = "DAY"
MONTH = "MONTH"
YEAR = "YEAR"
class BillingAddress(BaseModel):
"""Billing address for CREDIT_POSTPAY mandates."""
company: str = Field(..., description="Company name")
street: str = Field(..., description="Street and number")
zip: str = Field(..., description="Postal code")
city: str = Field(..., description="City")
country: str = Field(default="CH", description="Country code")
vatNumber: Optional[str] = Field(None, description="VAT number (optional)")
registerModelLabels(
"BillingAddress",
{"en": "Billing Address", "de": "Rechnungsadresse"},
{
"company": {"en": "Company", "de": "Firma"},
"street": {"en": "Street", "de": "Strasse"},
"zip": {"en": "ZIP", "de": "PLZ"},
"city": {"en": "City", "de": "Ort"},
"country": {"en": "Country", "de": "Land"},
"vatNumber": {"en": "VAT Number", "de": "MwSt-Nummer"},
},
)
class BillingAccount(BaseModel):
"""Billing account for mandate or user-mandate combination."""
id: str = Field(
default_factory=lambda: str(uuid.uuid4()), description="Primary key"
)
mandateId: str = Field(..., description="Foreign key to Mandate")
userId: Optional[str] = Field(None, description="Foreign key to User (only for PREPAY_USER)")
accountType: AccountTypeEnum = Field(..., description="Account type: MANDATE or USER")
balance: float = Field(default=0.0, description="Current balance in CHF")
creditLimit: Optional[float] = Field(None, description="Credit limit in CHF (only for CREDIT_POSTPAY)")
warningThreshold: float = Field(default=0.0, description="Warning threshold in CHF")
lastWarningAt: Optional[datetime] = Field(None, description="Last warning sent timestamp")
enabled: bool = Field(default=True, description="Account is active")
registerModelLabels(
"BillingAccount",
{"en": "Billing Account", "de": "Abrechnungskonto"},
{
"id": {"en": "ID", "de": "ID"},
"mandateId": {"en": "Mandate ID", "de": "Mandanten-ID"},
"userId": {"en": "User ID", "de": "Benutzer-ID"},
"accountType": {"en": "Account Type", "de": "Kontotyp"},
"balance": {"en": "Balance (CHF)", "de": "Guthaben (CHF)"},
"creditLimit": {"en": "Credit Limit (CHF)", "de": "Kreditlimit (CHF)"},
"warningThreshold": {"en": "Warning Threshold (CHF)", "de": "Warnschwelle (CHF)"},
"lastWarningAt": {"en": "Last Warning", "de": "Letzte Warnung"},
"enabled": {"en": "Enabled", "de": "Aktiv"},
},
)
class BillingTransaction(BaseModel):
"""Single billing transaction (credit, debit, adjustment)."""
id: str = Field(
default_factory=lambda: str(uuid.uuid4()), description="Primary key"
)
accountId: str = Field(..., description="Foreign key to BillingAccount")
transactionType: TransactionTypeEnum = Field(..., description="Transaction type")
amount: float = Field(..., description="Amount in CHF (always positive)")
description: str = Field(..., description="Transaction description")
# Reference to source
referenceType: Optional[ReferenceTypeEnum] = Field(None, description="Reference type")
referenceId: Optional[str] = Field(None, description="Reference ID")
# Context for workflow transactions
workflowId: Optional[str] = Field(None, description="Workflow ID (for WORKFLOW transactions)")
featureInstanceId: Optional[str] = Field(None, description="Feature instance ID")
featureCode: Optional[str] = Field(None, description="Feature code (e.g., chatplayground, automation)")
aicoreProvider: Optional[str] = Field(None, description="AICore provider (anthropic, openai, etc.)")
registerModelLabels(
"BillingTransaction",
{"en": "Billing Transaction", "de": "Transaktion"},
{
"id": {"en": "ID", "de": "ID"},
"accountId": {"en": "Account ID", "de": "Konto-ID"},
"transactionType": {"en": "Type", "de": "Typ"},
"amount": {"en": "Amount (CHF)", "de": "Betrag (CHF)"},
"description": {"en": "Description", "de": "Beschreibung"},
"referenceType": {"en": "Reference Type", "de": "Referenztyp"},
"referenceId": {"en": "Reference ID", "de": "Referenz-ID"},
"workflowId": {"en": "Workflow ID", "de": "Workflow-ID"},
"featureInstanceId": {"en": "Feature Instance ID", "de": "Feature-Instanz-ID"},
"featureCode": {"en": "Feature Code", "de": "Feature-Code"},
"aicoreProvider": {"en": "AI Provider", "de": "AI-Anbieter"},
},
)
class BillingSettings(BaseModel):
"""Billing settings per mandate."""
id: str = Field(
default_factory=lambda: str(uuid.uuid4()), description="Primary key"
)
mandateId: str = Field(..., description="Foreign key to Mandate (UNIQUE)")
billingModel: BillingModelEnum = Field(..., description="Billing model")
# Configuration
defaultUserCredit: float = Field(default=10.0, description="Initial credit in CHF for new users (PREPAY_USER)")
warningThresholdPercent: float = Field(default=10.0, description="Warning threshold as percentage")
blockOnZeroBalance: bool = Field(default=True, description="Block AI features when balance is zero")
# Billing address (required for CREDIT_POSTPAY)
billingAddress: Optional[BillingAddress] = Field(None, description="Billing address")
# Notifications
notifyEmails: List[str] = Field(default_factory=list, description="Email addresses for billing notifications")
notifyOnWarning: bool = Field(default=True, description="Send email when warning threshold is reached")
registerModelLabels(
"BillingSettings",
{"en": "Billing Settings", "de": "Abrechnungseinstellungen"},
{
"id": {"en": "ID", "de": "ID"},
"mandateId": {"en": "Mandate ID", "de": "Mandanten-ID"},
"billingModel": {"en": "Billing Model", "de": "Abrechnungsmodell"},
"defaultUserCredit": {"en": "Default User Credit (CHF)", "de": "Standard-Startguthaben (CHF)"},
"warningThresholdPercent": {"en": "Warning Threshold (%)", "de": "Warnschwelle (%)"},
"blockOnZeroBalance": {"en": "Block on Zero Balance", "de": "Bei 0 blockieren"},
"billingAddress": {"en": "Billing Address", "de": "Rechnungsadresse"},
"notifyEmails": {"en": "Notification Emails", "de": "Benachrichtigungs-Emails"},
"notifyOnWarning": {"en": "Notify on Warning", "de": "Bei Warnung benachrichtigen"},
},
)
class UsageStatistics(BaseModel):
"""Aggregated usage statistics for quick retrieval."""
id: str = Field(
default_factory=lambda: str(uuid.uuid4()), description="Primary key"
)
accountId: str = Field(..., description="Foreign key to BillingAccount")
periodType: PeriodTypeEnum = Field(..., description="Period type")
periodStart: date = Field(..., description="Period start date")
# Aggregated values
totalCostCHF: float = Field(default=0.0, description="Total cost in CHF")
transactionCount: int = Field(default=0, description="Number of transactions")
# Breakdown by provider
costByProvider: Dict[str, float] = Field(
default_factory=dict,
description="Cost breakdown by provider (e.g., {'anthropic': 12.50, 'openai': 8.30})"
)
# Breakdown by feature
costByFeature: Dict[str, float] = Field(
default_factory=dict,
description="Cost breakdown by feature (e.g., {'chatplayground': 15.00, 'automation': 5.80})"
)
registerModelLabels(
"UsageStatistics",
{"en": "Usage Statistics", "de": "Nutzungsstatistik"},
{
"id": {"en": "ID", "de": "ID"},
"accountId": {"en": "Account ID", "de": "Konto-ID"},
"periodType": {"en": "Period Type", "de": "Periodentyp"},
"periodStart": {"en": "Period Start", "de": "Periodenbeginn"},
"totalCostCHF": {"en": "Total Cost (CHF)", "de": "Gesamtkosten (CHF)"},
"transactionCount": {"en": "Transaction Count", "de": "Anzahl Transaktionen"},
"costByProvider": {"en": "Cost by Provider", "de": "Kosten nach Anbieter"},
"costByFeature": {"en": "Cost by Feature", "de": "Kosten nach Feature"},
},
)
# ============================================================================
# Response Models for API
# ============================================================================
class BillingBalanceResponse(BaseModel):
"""Response model for balance endpoint."""
mandateId: str
mandateName: str
billingModel: BillingModelEnum
balance: float
currency: str = "CHF"
warningThreshold: float
isWarning: bool
creditLimit: Optional[float] = None
class BillingStatisticsChartData(BaseModel):
"""Chart data point for statistics."""
label: str
totalCost: float
byProvider: Dict[str, float]
class BillingStatisticsResponse(BaseModel):
"""Response model for statistics endpoint."""
mandateId: str
period: PeriodTypeEnum
year: int
month: Optional[int] = None
currency: str = "CHF"
data: List[BillingStatisticsChartData]
totals: Dict[str, Any]
class BillingCheckResult(BaseModel):
"""Result of a billing balance check."""
allowed: bool
reason: Optional[str] = None
currentBalance: Optional[float] = None
requiredAmount: Optional[float] = None
billingModel: Optional[BillingModelEnum] = None

View file

@ -25,6 +25,7 @@ class AutomationDefinition(BaseModel):
eventId: Optional[str] = Field(None, description="Event ID from event management (None if not registered)", json_schema_extra={"frontend_type": "text", "frontend_readonly": True, "frontend_required": False})
status: Optional[str] = Field(None, description="Status: 'active' if event is registered, 'inactive' if not (computed, readonly)", json_schema_extra={"frontend_type": "text", "frontend_readonly": True, "frontend_required": False})
executionLogs: List[Dict[str, Any]] = Field(default_factory=list, description="List of execution logs, each containing timestamp, workflowId, status, and messages", json_schema_extra={"frontend_type": "text", "frontend_readonly": True, "frontend_required": False})
allowedProviders: List[str] = Field(default_factory=list, description="List of allowed AICore providers (e.g., 'anthropic', 'openai'). Empty means all RBAC-permitted providers are allowed.", json_schema_extra={"frontend_type": "multiselect", "frontend_readonly": False, "frontend_required": False})
registerModelLabels(
@ -42,6 +43,7 @@ registerModelLabels(
"eventId": {"en": "Event ID", "ge": "Event-ID", "fr": "ID de l'événement"},
"status": {"en": "Status", "ge": "Status", "fr": "Statut"},
"executionLogs": {"en": "Execution Logs", "ge": "Ausführungsprotokolle", "fr": "Journaux d'exécution"},
"allowedProviders": {"en": "Allowed Providers", "ge": "Erlaubte Provider", "fr": "Fournisseurs autorisés"},
},
)

View file

@ -76,6 +76,10 @@ def initBootstrap(db: DatabaseConnector) -> None:
# Initialize feature instances for root mandate
if mandateId:
initRootMandateFeatures(db, mandateId)
# Initialize billing settings for root mandate
if mandateId:
initRootMandateBilling(mandateId)
def initAutomationTemplates(dbApp: DatabaseConnector, adminUserId: Optional[str] = None) -> None:
@ -1192,6 +1196,115 @@ def _createResourceContextRules(db: DatabaseConnector) -> None:
db.recordCreate(AccessRule, rule)
logger.info(f"Created {len(resourceRules)} RESOURCE context rules")
# Create AICore provider RBAC rules
_createAicoreProviderRules(db)
def _createAicoreProviderRules(db: DatabaseConnector) -> None:
"""
Create RBAC rules for AICore providers (resource.aicore.{provider}).
All roles get access to all providers by default.
NOTE: Provider list is dynamically discovered from AICore model registry.
Args:
db: Database connector instance
"""
try:
from modules.aicore.aicoreModelRegistry import modelRegistry
# Discover available connectors dynamically
connectors = modelRegistry.discoverConnectors()
providers = [c.getConnectorType() for c in connectors]
if not providers:
logger.warning("No AICore providers discovered, skipping provider RBAC rules")
return
logger.info(f"Creating RBAC rules for AICore providers: {providers}")
providerRules = []
# All roles get access to all providers (as per requirement)
for roleLabel in ["admin", "user", "viewer"]:
roleId = _getRoleId(db, roleLabel)
if not roleId:
continue
for provider in providers:
resourceKey = f"resource.aicore.{provider}"
# Check if rule already exists
existingRules = db.getRecordset(
AccessRule,
recordFilter={
"roleId": roleId,
"context": AccessRuleContext.RESOURCE.value,
"item": resourceKey
}
)
if not existingRules:
providerRules.append(AccessRule(
roleId=roleId,
context=AccessRuleContext.RESOURCE,
item=resourceKey,
view=True, # view=True means "can use" for RESOURCE context
read=None,
create=None,
update=None,
delete=None,
))
for rule in providerRules:
db.recordCreate(AccessRule, rule)
if providerRules:
logger.info(f"Created {len(providerRules)} AICore provider RBAC rules")
else:
logger.debug("All AICore provider RBAC rules already exist")
except Exception as e:
logger.warning(f"Failed to create AICore provider RBAC rules: {e}")
def initRootMandateBilling(mandateId: str) -> None:
"""
Initialize billing settings for root mandate.
Root mandate uses PREPAY_USER model with 10 CHF initial credit per user.
Args:
mandateId: Root mandate ID
"""
try:
from modules.interfaces.interfaceDbBilling import _getRootInterface
from modules.datamodels.datamodelBilling import BillingSettings, BillingModelEnum
billingInterface = _getRootInterface()
# Check if settings already exist
existingSettings = billingInterface.getSettings(mandateId)
if existingSettings:
logger.info("Billing settings for root mandate already exist")
return
# Create billing settings for root mandate
settings = BillingSettings(
mandateId=mandateId,
billingModel=BillingModelEnum.PREPAY_USER,
defaultUserCredit=10.0, # 10 CHF initial credit per user
warningThresholdPercent=10.0,
blockOnZeroBalance=True,
notifyOnWarning=True
)
billingInterface.createSettings(settings)
logger.info(f"Created billing settings for root mandate: PREPAY_USER with 10 CHF default credit")
except Exception as e:
# Don't fail bootstrap if billing init fails
logger.warning(f"Failed to initialize root mandate billing (non-critical): {e}")
def assignInitialUserMemberships(

View file

@ -0,0 +1,734 @@
# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Interface for Billing operations.
Manages billing accounts, transactions, and usage statistics.
All billing data is stored in the poweron_billing database.
"""
import logging
from typing import Dict, Any, List, Optional
from datetime import date, datetime, timedelta
import uuid
from modules.connectors.connectorDbPostgre import DatabaseConnector
from modules.shared.configuration import APP_CONFIG
from modules.shared.timeUtils import getUtcTimestamp
from modules.datamodels.datamodelUam import User
from modules.datamodels.datamodelBilling import (
BillingAccount,
BillingTransaction,
BillingSettings,
UsageStatistics,
BillingAddress,
BillingModelEnum,
AccountTypeEnum,
TransactionTypeEnum,
ReferenceTypeEnum,
PeriodTypeEnum,
BillingBalanceResponse,
BillingCheckResult,
)
logger = logging.getLogger(__name__)
# Singleton factory for BillingObjects instances
_billingInterfaces: Dict[str, "BillingObjects"] = {}
# Database name for billing
BILLING_DATABASE = "poweron_billing"
def getInterface(currentUser: User, mandateId: str = None) -> "BillingObjects":
"""
Factory function to get or create a BillingObjects instance.
Args:
currentUser: Current user object
mandateId: Mandate ID for context
Returns:
BillingObjects instance
"""
cacheKey = f"{currentUser.id}_{mandateId}"
if cacheKey not in _billingInterfaces:
_billingInterfaces[cacheKey] = BillingObjects(currentUser, mandateId)
else:
_billingInterfaces[cacheKey].setUserContext(currentUser, mandateId)
return _billingInterfaces[cacheKey]
def _getRootInterface() -> "BillingObjects":
"""Get interface with system access for bootstrap operations."""
from modules.security.rootAccess import getRootUser
rootUser = getRootUser()
return BillingObjects(rootUser, mandateId=None)
class BillingObjects:
"""
Interface for billing operations.
Manages accounts, transactions, settings, and statistics.
"""
def __init__(self, currentUser: Optional[User] = None, mandateId: str = None):
"""
Initialize the billing interface.
Args:
currentUser: Current user object
mandateId: Mandate ID for context
"""
self.currentUser = currentUser
self.userId = currentUser.id if currentUser else None
self.mandateId = mandateId
# Initialize database connection
self._initializeDatabase()
def _initializeDatabase(self):
"""Initialize database connection."""
self.db = DatabaseConnector(
databaseName=BILLING_DATABASE,
host=APP_CONFIG.get('Database_Host', 'localhost'),
port=int(APP_CONFIG.get('Database_Port', '5432')),
user=APP_CONFIG.get('Database_User', 'admin'),
password=APP_CONFIG.get('Database_Password', 'admin')
)
def setUserContext(self, currentUser: User, mandateId: str = None):
"""
Update user context.
Args:
currentUser: Current user object
mandateId: Mandate ID for context
"""
self.currentUser = currentUser
self.userId = currentUser.id if currentUser else None
self.mandateId = mandateId
# =========================================================================
# BillingSettings Operations
# =========================================================================
def getSettings(self, mandateId: str) -> Optional[Dict[str, Any]]:
"""
Get billing settings for a mandate.
Args:
mandateId: Mandate ID
Returns:
BillingSettings dict or None if not found
"""
try:
results = self.db.getRecordset(
BillingSettings,
filterDict={"mandateId": mandateId}
)
return results[0] if results else None
except Exception as e:
logger.error(f"Error getting billing settings: {e}")
return None
def createSettings(self, settings: BillingSettings) -> Dict[str, Any]:
"""
Create billing settings for a mandate.
Args:
settings: BillingSettings object
Returns:
Created settings dict
"""
settingsDict = settings.model_dump(exclude_none=True)
# Handle nested BillingAddress
if settings.billingAddress:
settingsDict["billingAddress"] = settings.billingAddress.model_dump()
return self.db.recordCreate(BillingSettings, settingsDict)
def updateSettings(self, settingsId: str, updates: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""
Update billing settings.
Args:
settingsId: Settings ID
updates: Fields to update
Returns:
Updated settings dict or None
"""
return self.db.recordModify(BillingSettings, settingsId, updates)
def getOrCreateSettings(self, mandateId: str, defaultModel: BillingModelEnum = BillingModelEnum.UNLIMITED) -> Dict[str, Any]:
"""
Get or create billing settings for a mandate.
Args:
mandateId: Mandate ID
defaultModel: Default billing model if creating
Returns:
BillingSettings dict
"""
existing = self.getSettings(mandateId)
if existing:
return existing
settings = BillingSettings(
mandateId=mandateId,
billingModel=defaultModel,
defaultUserCredit=10.0,
warningThresholdPercent=10.0,
blockOnZeroBalance=True,
notifyOnWarning=True
)
return self.createSettings(settings)
# =========================================================================
# BillingAccount Operations
# =========================================================================
def getAccount(self, accountId: str) -> Optional[Dict[str, Any]]:
"""Get a billing account by ID."""
try:
results = self.db.getRecordset(
BillingAccount,
filterDict={"id": accountId}
)
return results[0] if results else None
except Exception as e:
logger.error(f"Error getting billing account: {e}")
return None
def getMandateAccount(self, mandateId: str) -> Optional[Dict[str, Any]]:
"""
Get the mandate-level billing account.
Args:
mandateId: Mandate ID
Returns:
BillingAccount dict or None
"""
try:
results = self.db.getRecordset(
BillingAccount,
filterDict={
"mandateId": mandateId,
"accountType": AccountTypeEnum.MANDATE.value
}
)
return results[0] if results else None
except Exception as e:
logger.error(f"Error getting mandate account: {e}")
return None
def getUserAccount(self, mandateId: str, userId: str) -> Optional[Dict[str, Any]]:
"""
Get a user-level billing account within a mandate.
Args:
mandateId: Mandate ID
userId: User ID
Returns:
BillingAccount dict or None
"""
try:
results = self.db.getRecordset(
BillingAccount,
filterDict={
"mandateId": mandateId,
"userId": userId,
"accountType": AccountTypeEnum.USER.value
}
)
return results[0] if results else None
except Exception as e:
logger.error(f"Error getting user account: {e}")
return None
def createAccount(self, account: BillingAccount) -> Dict[str, Any]:
"""
Create a new billing account.
Args:
account: BillingAccount object
Returns:
Created account dict
"""
accountDict = account.model_dump(exclude_none=True)
return self.db.recordCreate(BillingAccount, accountDict)
def updateAccountBalance(self, accountId: str, newBalance: float) -> Optional[Dict[str, Any]]:
"""
Update account balance atomically.
Args:
accountId: Account ID
newBalance: New balance value
Returns:
Updated account dict or None
"""
return self.db.recordModify(BillingAccount, accountId, {"balance": newBalance})
def getOrCreateMandateAccount(self, mandateId: str, initialBalance: float = 0.0) -> Dict[str, Any]:
"""
Get or create a mandate-level billing account.
Args:
mandateId: Mandate ID
initialBalance: Initial balance if creating
Returns:
BillingAccount dict
"""
existing = self.getMandateAccount(mandateId)
if existing:
return existing
account = BillingAccount(
mandateId=mandateId,
accountType=AccountTypeEnum.MANDATE,
balance=initialBalance,
enabled=True
)
return self.createAccount(account)
def getOrCreateUserAccount(self, mandateId: str, userId: str, initialBalance: float = 0.0) -> Dict[str, Any]:
"""
Get or create a user-level billing account.
Args:
mandateId: Mandate ID
userId: User ID
initialBalance: Initial balance if creating
Returns:
BillingAccount dict
"""
existing = self.getUserAccount(mandateId, userId)
if existing:
return existing
account = BillingAccount(
mandateId=mandateId,
userId=userId,
accountType=AccountTypeEnum.USER,
balance=initialBalance,
enabled=True
)
created = self.createAccount(account)
# If initial balance > 0, create a SYSTEM credit transaction
if initialBalance > 0:
self.createTransaction(BillingTransaction(
accountId=created["id"],
transactionType=TransactionTypeEnum.CREDIT,
amount=initialBalance,
description="Initial credit for new user",
referenceType=ReferenceTypeEnum.SYSTEM
))
return created
# =========================================================================
# BillingTransaction Operations
# =========================================================================
def createTransaction(self, transaction: BillingTransaction) -> Dict[str, Any]:
"""
Create a new billing transaction and update account balance.
Args:
transaction: BillingTransaction object
Returns:
Created transaction dict
"""
# Get current account
account = self.getAccount(transaction.accountId)
if not account:
raise ValueError(f"Account {transaction.accountId} not found")
currentBalance = account.get("balance", 0.0)
# Calculate new balance
if transaction.transactionType == TransactionTypeEnum.CREDIT:
newBalance = currentBalance + transaction.amount
elif transaction.transactionType == TransactionTypeEnum.DEBIT:
newBalance = currentBalance - transaction.amount
else: # ADJUSTMENT
newBalance = currentBalance + transaction.amount # Can be positive or negative
# Create transaction
transactionDict = transaction.model_dump(exclude_none=True)
created = self.db.recordCreate(BillingTransaction, transactionDict)
# Update account balance
self.updateAccountBalance(transaction.accountId, newBalance)
logger.info(f"Billing transaction created: {transaction.transactionType.value} {transaction.amount} CHF, "
f"balance: {currentBalance} -> {newBalance}")
return created
def getTransactions(
self,
accountId: str,
limit: int = 100,
offset: int = 0,
startDate: date = None,
endDate: date = None
) -> List[Dict[str, Any]]:
"""
Get transactions for an account.
Args:
accountId: Account ID
limit: Maximum number of results
offset: Offset for pagination
startDate: Filter by start date
endDate: Filter by end date
Returns:
List of transaction dicts
"""
try:
filterDict = {"accountId": accountId}
results = self.db.getRecordset(BillingTransaction, filterDict=filterDict)
# Apply date filters if provided
if startDate or endDate:
filtered = []
for t in results:
createdAt = t.get("_createdAt")
if createdAt:
tDate = createdAt.date() if isinstance(createdAt, datetime) else createdAt
if startDate and tDate < startDate:
continue
if endDate and tDate > endDate:
continue
filtered.append(t)
results = filtered
# Sort by creation date descending
results.sort(key=lambda x: x.get("_createdAt", ""), reverse=True)
# Apply pagination
return results[offset:offset + limit]
except Exception as e:
logger.error(f"Error getting transactions: {e}")
return []
def getTransactionsByMandate(self, mandateId: str, limit: int = 100) -> List[Dict[str, Any]]:
"""
Get all transactions for a mandate (across all accounts).
Args:
mandateId: Mandate ID
limit: Maximum number of results
Returns:
List of transaction dicts
"""
# Get all accounts for mandate
accounts = self.db.getRecordset(BillingAccount, filterDict={"mandateId": mandateId})
allTransactions = []
for account in accounts:
transactions = self.getTransactions(account["id"], limit=limit)
allTransactions.extend(transactions)
# Sort by creation date descending and limit
allTransactions.sort(key=lambda x: x.get("_createdAt", ""), reverse=True)
return allTransactions[:limit]
# =========================================================================
# Balance Check Operations
# =========================================================================
def checkBalance(self, mandateId: str, userId: str, estimatedCost: float) -> BillingCheckResult:
"""
Check if there's sufficient balance for an operation.
Args:
mandateId: Mandate ID
userId: User ID
estimatedCost: Estimated cost of the operation
Returns:
BillingCheckResult
"""
settings = self.getSettings(mandateId)
if not settings:
# No settings = no billing = allowed
return BillingCheckResult(allowed=True, billingModel=BillingModelEnum.UNLIMITED)
billingModel = BillingModelEnum(settings.get("billingModel", BillingModelEnum.UNLIMITED.value))
# UNLIMITED = always allowed
if billingModel == BillingModelEnum.UNLIMITED:
return BillingCheckResult(allowed=True, billingModel=billingModel)
# Get the relevant account
if billingModel == BillingModelEnum.PREPAY_USER:
account = self.getUserAccount(mandateId, userId)
else:
account = self.getMandateAccount(mandateId)
if not account:
# No account = no balance = potentially blocked
if settings.get("blockOnZeroBalance", True):
return BillingCheckResult(
allowed=False,
reason="NO_ACCOUNT",
currentBalance=0.0,
requiredAmount=estimatedCost,
billingModel=billingModel
)
return BillingCheckResult(allowed=True, currentBalance=0.0, billingModel=billingModel)
currentBalance = account.get("balance", 0.0)
# CREDIT_POSTPAY with credit limit check
if billingModel == BillingModelEnum.CREDIT_POSTPAY:
creditLimit = account.get("creditLimit")
if creditLimit and abs(currentBalance) + estimatedCost > creditLimit:
return BillingCheckResult(
allowed=False,
reason="CREDIT_LIMIT_EXCEEDED",
currentBalance=currentBalance,
requiredAmount=estimatedCost,
billingModel=billingModel
)
return BillingCheckResult(allowed=True, currentBalance=currentBalance, billingModel=billingModel)
# PREPAY models - check balance
if currentBalance < estimatedCost:
if settings.get("blockOnZeroBalance", True):
return BillingCheckResult(
allowed=False,
reason="INSUFFICIENT_BALANCE",
currentBalance=currentBalance,
requiredAmount=estimatedCost,
billingModel=billingModel
)
return BillingCheckResult(allowed=True, currentBalance=currentBalance, billingModel=billingModel)
def recordUsage(
self,
mandateId: str,
userId: str,
priceCHF: float,
workflowId: str = None,
featureInstanceId: str = None,
featureCode: str = None,
aicoreProvider: str = None,
description: str = "AI Usage"
) -> Optional[Dict[str, Any]]:
"""
Record usage cost as a billing transaction.
Args:
mandateId: Mandate ID
userId: User ID
priceCHF: Cost in CHF
workflowId: Optional workflow ID
featureInstanceId: Optional feature instance ID
featureCode: Optional feature code
aicoreProvider: Optional AICore provider name
description: Transaction description
Returns:
Created transaction dict or None
"""
if priceCHF <= 0:
return None
settings = self.getSettings(mandateId)
if not settings:
logger.debug(f"No billing settings for mandate {mandateId}, skipping usage recording")
return None
billingModel = BillingModelEnum(settings.get("billingModel", BillingModelEnum.UNLIMITED.value))
# UNLIMITED = no transaction recording
if billingModel == BillingModelEnum.UNLIMITED:
return None
# Get or create the relevant account
if billingModel == BillingModelEnum.PREPAY_USER:
account = self.getOrCreateUserAccount(mandateId, userId)
else:
account = self.getOrCreateMandateAccount(mandateId)
# Create debit transaction
transaction = BillingTransaction(
accountId=account["id"],
transactionType=TransactionTypeEnum.DEBIT,
amount=priceCHF,
description=description,
referenceType=ReferenceTypeEnum.WORKFLOW,
workflowId=workflowId,
featureInstanceId=featureInstanceId,
featureCode=featureCode,
aicoreProvider=aicoreProvider
)
return self.createTransaction(transaction)
# =========================================================================
# Statistics Operations
# =========================================================================
def getUsageStatistics(
self,
accountId: str,
periodType: PeriodTypeEnum,
year: int,
month: int = None
) -> List[Dict[str, Any]]:
"""
Get usage statistics for an account.
Args:
accountId: Account ID
periodType: Period type (DAY, MONTH, YEAR)
year: Year
month: Month (for DAY period type)
Returns:
List of statistics dicts
"""
filterDict = {
"accountId": accountId,
"periodType": periodType.value
}
results = self.db.getRecordset(UsageStatistics, filterDict=filterDict)
# Filter by year
filtered = [s for s in results if s.get("periodStart") and s["periodStart"].year == year]
# Filter by month if specified
if month and periodType == PeriodTypeEnum.DAY:
filtered = [s for s in filtered if s["periodStart"].month == month]
return sorted(filtered, key=lambda x: x.get("periodStart", date.min))
def calculateStatisticsFromTransactions(
self,
accountId: str,
startDate: date,
endDate: date
) -> Dict[str, Any]:
"""
Calculate statistics from transactions for a period.
Args:
accountId: Account ID
startDate: Start date
endDate: End date
Returns:
Statistics dict
"""
transactions = self.getTransactions(accountId, limit=10000, startDate=startDate, endDate=endDate)
# Filter only DEBIT transactions (usage)
debits = [t for t in transactions if t.get("transactionType") == TransactionTypeEnum.DEBIT.value]
totalCost = sum(t.get("amount", 0) for t in debits)
# Calculate by provider
costByProvider = {}
for t in debits:
provider = t.get("aicoreProvider", "unknown")
costByProvider[provider] = costByProvider.get(provider, 0) + t.get("amount", 0)
# Calculate by feature
costByFeature = {}
for t in debits:
feature = t.get("featureCode", "unknown")
costByFeature[feature] = costByFeature.get(feature, 0) + t.get("amount", 0)
return {
"totalCostCHF": totalCost,
"transactionCount": len(debits),
"costByProvider": costByProvider,
"costByFeature": costByFeature
}
# =========================================================================
# Utility Methods
# =========================================================================
def getBalancesForUser(self, userId: str) -> List[BillingBalanceResponse]:
"""
Get all billing balances for a user across mandates.
Args:
userId: User ID
Returns:
List of BillingBalanceResponse
"""
from modules.interfaces.interfaceDbApp import getInterface as getAppInterface
balances = []
# Get all mandates the user belongs to
try:
appInterface = getAppInterface(self.currentUser)
userMandates = appInterface.getUserMandates(userId)
for um in userMandates:
mandateId = um.get("mandateId")
mandate = appInterface.getMandate(mandateId)
if not mandate:
continue
settings = self.getSettings(mandateId)
if not settings:
continue
billingModel = BillingModelEnum(settings.get("billingModel", BillingModelEnum.UNLIMITED.value))
# Get the relevant account
if billingModel == BillingModelEnum.PREPAY_USER:
account = self.getUserAccount(mandateId, userId)
elif billingModel in [BillingModelEnum.PREPAY_MANDATE, BillingModelEnum.CREDIT_POSTPAY]:
account = self.getMandateAccount(mandateId)
else:
continue
if not account:
continue
balance = account.get("balance", 0.0)
warningThreshold = account.get("warningThreshold", 0.0)
balances.append(BillingBalanceResponse(
mandateId=mandateId,
mandateName=mandate.get("name", ""),
billingModel=billingModel,
balance=balance,
warningThreshold=warningThreshold,
isWarning=balance <= warningThreshold,
creditLimit=account.get("creditLimit")
))
except Exception as e:
logger.error(f"Error getting balances for user: {e}")
return balances

View file

@ -0,0 +1,557 @@
# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Billing routes for the backend API.
Implements the endpoints for billing management and usage tracking.
Features:
- User endpoints: View balance, transactions, statistics
- Admin endpoints: Manage settings, add credits, view all accounts
"""
from fastapi import APIRouter, HTTPException, Depends, Body, Path, Request, Response, Query
from typing import List, Dict, Any, Optional
from fastapi import status
import logging
from datetime import date, datetime
from pydantic import BaseModel, Field
# Import auth module
from modules.auth import limiter, requireSysAdmin, getRequestContext, RequestContext
# Import billing components
from modules.interfaces.interfaceDbBilling import getInterface as getBillingInterface
from modules.services.serviceBilling.mainServiceBilling import getService as getBillingService
from modules.datamodels.datamodelBilling import (
BillingAccount,
BillingTransaction,
BillingSettings,
BillingAddress,
BillingModelEnum,
TransactionTypeEnum,
ReferenceTypeEnum,
PeriodTypeEnum,
BillingBalanceResponse,
BillingStatisticsResponse,
BillingStatisticsChartData,
BillingCheckResult,
)
# Configure logger
logger = logging.getLogger(__name__)
# =============================================================================
# Request/Response Models
# =============================================================================
class CreditAddRequest(BaseModel):
"""Request model for adding credit to an account."""
userId: Optional[str] = Field(None, description="Target user ID (for PREPAY_USER model)")
amount: float = Field(..., gt=0, description="Amount to credit in CHF")
description: str = Field(default="Manual credit", description="Transaction description")
class BillingSettingsUpdate(BaseModel):
"""Request model for updating billing settings."""
billingModel: Optional[BillingModelEnum] = None
defaultUserCredit: Optional[float] = Field(None, ge=0)
warningThresholdPercent: Optional[float] = Field(None, ge=0, le=100)
blockOnZeroBalance: Optional[bool] = None
notifyOnWarning: Optional[bool] = None
notifyEmails: Optional[List[str]] = None
billingAddress: Optional[BillingAddress] = None
class TransactionResponse(BaseModel):
"""Response model for a billing transaction."""
id: str
accountId: str
transactionType: TransactionTypeEnum
amount: float
description: str
referenceType: Optional[ReferenceTypeEnum]
workflowId: Optional[str]
featureCode: Optional[str]
aicoreProvider: Optional[str]
createdAt: Optional[datetime]
class AccountSummary(BaseModel):
"""Summary of a billing account."""
id: str
mandateId: str
userId: Optional[str]
accountType: str
balance: float
creditLimit: Optional[float]
warningThreshold: float
enabled: bool
class UsageReportResponse(BaseModel):
"""Usage report for a period."""
period: str
totalCost: float
transactionCount: int
costByProvider: Dict[str, float]
costByFeature: Dict[str, float]
# =============================================================================
# Router Setup
# =============================================================================
router = APIRouter(
prefix="/api/billing",
tags=["Billing"],
responses={404: {"description": "Not found"}}
)
# =============================================================================
# User Endpoints
# =============================================================================
@router.get("/balance", response_model=List[BillingBalanceResponse])
@limiter.limit("60/minute")
async def getBalance(
request: Request,
ctx: RequestContext = Depends(getRequestContext)
):
"""
Get billing balances for all mandates the current user belongs to.
Returns balance information for each mandate.
"""
try:
billingService = getBillingService(
ctx.currentUser,
ctx.mandateId,
featureCode="billing"
)
balances = billingService.getBalancesForUser()
return balances
except Exception as e:
logger.error(f"Error getting billing balance: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/balance/{mandateId}", response_model=BillingBalanceResponse)
@limiter.limit("60/minute")
async def getBalanceForMandate(
request: Request,
mandateId: str = Path(..., description="Mandate ID"),
ctx: RequestContext = Depends(getRequestContext)
):
"""
Get billing balance for a specific mandate.
"""
try:
billingService = getBillingService(
ctx.currentUser,
mandateId,
featureCode="billing"
)
# Check balance
checkResult = billingService.checkBalance(0.0)
# Get mandate name from app interface
from modules.interfaces.interfaceDbApp import getInterface as getAppInterface
appInterface = getAppInterface(ctx.currentUser, mandateId=mandateId)
mandate = appInterface.getMandate(mandateId)
mandateName = mandate.get("name", "") if mandate else ""
return BillingBalanceResponse(
mandateId=mandateId,
mandateName=mandateName,
billingModel=checkResult.billingModel or BillingModelEnum.UNLIMITED,
balance=checkResult.currentBalance or 0.0,
warningThreshold=0.0, # TODO: Get from account
isWarning=False,
creditLimit=None
)
except Exception as e:
logger.error(f"Error getting billing balance for mandate {mandateId}: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/transactions", response_model=List[TransactionResponse])
@limiter.limit("30/minute")
async def getTransactions(
request: Request,
limit: int = Query(default=50, ge=1, le=500),
offset: int = Query(default=0, ge=0),
ctx: RequestContext = Depends(getRequestContext)
):
"""
Get transaction history for the current mandate.
"""
try:
billingService = getBillingService(
ctx.currentUser,
ctx.mandateId,
featureCode="billing"
)
transactions = billingService.getTransactionHistory(limit=limit)
# Convert to response model
result = []
for t in transactions[offset:offset + limit]:
result.append(TransactionResponse(
id=t.get("id"),
accountId=t.get("accountId"),
transactionType=TransactionTypeEnum(t.get("transactionType", "DEBIT")),
amount=t.get("amount", 0.0),
description=t.get("description", ""),
referenceType=ReferenceTypeEnum(t["referenceType"]) if t.get("referenceType") else None,
workflowId=t.get("workflowId"),
featureCode=t.get("featureCode"),
aicoreProvider=t.get("aicoreProvider"),
createdAt=t.get("_createdAt")
))
return result
except Exception as e:
logger.error(f"Error getting billing transactions: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/statistics/{period}", response_model=UsageReportResponse)
@limiter.limit("30/minute")
async def getStatistics(
request: Request,
period: str = Path(..., description="Period: 'day', 'month', or 'year'"),
year: int = Query(..., description="Year"),
month: Optional[int] = Query(None, description="Month (1-12, required for 'day' period)"),
ctx: RequestContext = Depends(getRequestContext)
):
"""
Get usage statistics for a period.
"""
try:
# Validate period
if period not in ["day", "month", "year"]:
raise HTTPException(status_code=400, detail="Invalid period. Use 'day', 'month', or 'year'")
if period == "day" and not month:
raise HTTPException(status_code=400, detail="Month is required for 'day' period")
billingInterface = getBillingInterface(ctx.currentUser, ctx.mandateId)
settings = billingInterface.getSettings(ctx.mandateId)
if not settings:
return UsageReportResponse(
period=period,
totalCost=0.0,
transactionCount=0,
costByProvider={},
costByFeature={}
)
billingModel = BillingModelEnum(settings.get("billingModel", BillingModelEnum.UNLIMITED.value))
# Get the relevant account
if billingModel == BillingModelEnum.PREPAY_USER:
account = billingInterface.getUserAccount(ctx.mandateId, ctx.currentUser.id)
else:
account = billingInterface.getMandateAccount(ctx.mandateId)
if not account:
return UsageReportResponse(
period=period,
totalCost=0.0,
transactionCount=0,
costByProvider={},
costByFeature={}
)
# Calculate date range
if period == "day":
startDate = date(year, month, 1)
if month == 12:
endDate = date(year + 1, 1, 1)
else:
endDate = date(year, month + 1, 1)
elif period == "month":
startDate = date(year, 1, 1)
endDate = date(year + 1, 1, 1)
else: # year
startDate = date(year, 1, 1)
endDate = date(year + 1, 1, 1)
# Get statistics from transactions
stats = billingInterface.calculateStatisticsFromTransactions(
account["id"],
startDate,
endDate
)
return UsageReportResponse(
period=period,
totalCost=stats.get("totalCostCHF", 0.0),
transactionCount=stats.get("transactionCount", 0),
costByProvider=stats.get("costByProvider", {}),
costByFeature=stats.get("costByFeature", {})
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting billing statistics: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/providers", response_model=List[str])
@limiter.limit("60/minute")
async def getAllowedProviders(
request: Request,
ctx: RequestContext = Depends(getRequestContext)
):
"""
Get list of AICore providers the current user is allowed to use.
"""
try:
billingService = getBillingService(
ctx.currentUser,
ctx.mandateId,
featureCode="billing"
)
return billingService.getallowedProviders()
except Exception as e:
logger.error(f"Error getting allowed providers: {e}")
raise HTTPException(status_code=500, detail=str(e))
# =============================================================================
# Admin Endpoints
# =============================================================================
@router.get("/admin/settings/{mandateId}", response_model=Dict[str, Any])
@limiter.limit("30/minute")
@requireSysAdmin
async def getSettingsAdmin(
request: Request,
mandateId: str = Path(..., description="Mandate ID"),
ctx: RequestContext = Depends(getRequestContext)
):
"""
Get billing settings for a mandate (SysAdmin only).
"""
try:
billingInterface = getBillingInterface(ctx.currentUser, mandateId)
settings = billingInterface.getSettings(mandateId)
if not settings:
raise HTTPException(status_code=404, detail="Billing settings not found")
return settings
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting billing settings: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/admin/settings/{mandateId}", response_model=Dict[str, Any])
@limiter.limit("10/minute")
@requireSysAdmin
async def createOrUpdateSettings(
request: Request,
mandateId: str = Path(..., description="Mandate ID"),
settingsUpdate: BillingSettingsUpdate = Body(...),
ctx: RequestContext = Depends(getRequestContext)
):
"""
Create or update billing settings for a mandate (SysAdmin only).
"""
try:
billingInterface = getBillingInterface(ctx.currentUser, mandateId)
existingSettings = billingInterface.getSettings(mandateId)
if existingSettings:
# Update existing settings
updates = settingsUpdate.model_dump(exclude_none=True)
if updates:
result = billingInterface.updateSettings(existingSettings["id"], updates)
return result or existingSettings
return existingSettings
else:
# Create new settings
from modules.datamodels.datamodelBilling import BillingSettings
newSettings = BillingSettings(
mandateId=mandateId,
billingModel=settingsUpdate.billingModel or BillingModelEnum.UNLIMITED,
defaultUserCredit=settingsUpdate.defaultUserCredit or 10.0,
warningThresholdPercent=settingsUpdate.warningThresholdPercent or 10.0,
blockOnZeroBalance=settingsUpdate.blockOnZeroBalance if settingsUpdate.blockOnZeroBalance is not None else True,
notifyOnWarning=settingsUpdate.notifyOnWarning if settingsUpdate.notifyOnWarning is not None else True,
notifyEmails=settingsUpdate.notifyEmails or [],
billingAddress=settingsUpdate.billingAddress
)
return billingInterface.createSettings(newSettings)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error updating billing settings: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/admin/credit/{mandateId}", response_model=Dict[str, Any])
@limiter.limit("10/minute")
@requireSysAdmin
async def addCredit(
request: Request,
mandateId: str = Path(..., description="Mandate ID"),
creditRequest: CreditAddRequest = Body(...),
ctx: RequestContext = Depends(getRequestContext)
):
"""
Add credit to a billing account (SysAdmin only).
For PREPAY_USER model, specify userId. For PREPAY_MANDATE, leave userId empty.
"""
try:
# Get settings to determine billing model
billingInterface = getBillingInterface(ctx.currentUser, mandateId)
settings = billingInterface.getSettings(mandateId)
if not settings:
raise HTTPException(status_code=404, detail="Billing settings not found for this mandate")
billingModel = BillingModelEnum(settings.get("billingModel", BillingModelEnum.UNLIMITED.value))
# Validate request based on billing model
if billingModel == BillingModelEnum.PREPAY_USER:
if not creditRequest.userId:
raise HTTPException(status_code=400, detail="userId is required for PREPAY_USER model")
# Create user-level account if needed and add credit
account = billingInterface.getOrCreateUserAccount(
mandateId,
creditRequest.userId,
initialBalance=0.0
)
elif billingModel in [BillingModelEnum.PREPAY_MANDATE, BillingModelEnum.CREDIT_POSTPAY]:
# Create mandate-level account if needed and add credit
account = billingInterface.getOrCreateMandateAccount(mandateId, initialBalance=0.0)
else:
raise HTTPException(status_code=400, detail=f"Cannot add credit to {billingModel.value} billing model")
# Create credit transaction
from modules.datamodels.datamodelBilling import BillingTransaction
transaction = BillingTransaction(
accountId=account["id"],
transactionType=TransactionTypeEnum.CREDIT,
amount=creditRequest.amount,
description=creditRequest.description,
referenceType=ReferenceTypeEnum.ADMIN
)
result = billingInterface.createTransaction(transaction)
logger.info(f"Added {creditRequest.amount} CHF credit to account {account['id']} in mandate {mandateId}")
return result
except HTTPException:
raise
except Exception as e:
logger.error(f"Error adding credit: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/admin/accounts/{mandateId}", response_model=List[AccountSummary])
@limiter.limit("30/minute")
@requireSysAdmin
async def getAccounts(
request: Request,
mandateId: str = Path(..., description="Mandate ID"),
ctx: RequestContext = Depends(getRequestContext)
):
"""
Get all billing accounts for a mandate (SysAdmin only).
"""
try:
billingInterface = getBillingInterface(ctx.currentUser, mandateId)
# Get all accounts for this mandate
from modules.connectors.connectorDbPostgre import DatabaseConnector
from modules.shared.configuration import APP_CONFIG
from modules.datamodels.datamodelBilling import BillingAccount
db = DatabaseConnector(
databaseName="poweron_billing",
host=APP_CONFIG.get('Database_Host', 'localhost'),
port=int(APP_CONFIG.get('Database_Port', '5432')),
user=APP_CONFIG.get('Database_User', 'admin'),
password=APP_CONFIG.get('Database_Password', 'admin')
)
accounts = db.getRecordset(BillingAccount, filterDict={"mandateId": mandateId})
result = []
for acc in accounts:
result.append(AccountSummary(
id=acc.get("id"),
mandateId=acc.get("mandateId"),
userId=acc.get("userId"),
accountType=acc.get("accountType"),
balance=acc.get("balance", 0.0),
creditLimit=acc.get("creditLimit"),
warningThreshold=acc.get("warningThreshold", 0.0),
enabled=acc.get("enabled", True)
))
return result
except Exception as e:
logger.error(f"Error getting billing accounts: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/admin/transactions/{mandateId}", response_model=List[TransactionResponse])
@limiter.limit("30/minute")
@requireSysAdmin
async def getTransactionsAdmin(
request: Request,
mandateId: str = Path(..., description="Mandate ID"),
limit: int = Query(default=100, ge=1, le=1000),
ctx: RequestContext = Depends(getRequestContext)
):
"""
Get all transactions for a mandate (SysAdmin only).
"""
try:
billingInterface = getBillingInterface(ctx.currentUser, mandateId)
transactions = billingInterface.getTransactionsByMandate(mandateId, limit=limit)
result = []
for t in transactions:
result.append(TransactionResponse(
id=t.get("id"),
accountId=t.get("accountId"),
transactionType=TransactionTypeEnum(t.get("transactionType", "DEBIT")),
amount=t.get("amount", 0.0),
description=t.get("description", ""),
referenceType=ReferenceTypeEnum(t["referenceType"]) if t.get("referenceType") else None,
workflowId=t.get("workflowId"),
featureCode=t.get("featureCode"),
aicoreProvider=t.get("aicoreProvider"),
createdAt=t.get("_createdAt")
))
return result
except Exception as e:
logger.error(f"Error getting billing transactions for mandate {mandateId}: {e}")
raise HTTPException(status_code=500, detail=str(e))

View file

@ -0,0 +1,7 @@
# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""Billing service module."""
from .mainServiceBilling import BillingService, getService
__all__ = ["BillingService", "getService"]

View file

@ -0,0 +1,408 @@
# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Billing Service - Central service for billing operations.
Handles:
- Balance checks before AI operations
- Cost recording after AI operations
- Provider permission checks via RBAC
- Price calculation with markup
"""
import logging
from typing import Dict, Any, List, Optional
from datetime import datetime
from modules.datamodels.datamodelUam import User
from modules.datamodels.datamodelBilling import (
BillingModelEnum,
BillingCheckResult,
TransactionTypeEnum,
ReferenceTypeEnum,
BillingTransaction,
BillingBalanceResponse,
)
from modules.interfaces.interfaceDbBilling import getInterface as getBillingInterface
logger = logging.getLogger(__name__)
# Markup percentage for internal pricing (50% = 1.5x)
BILLING_MARKUP_PERCENT = 50
# Singleton cache
_billingServices: Dict[str, "BillingService"] = {}
def getService(currentUser: User, mandateId: str, featureInstanceId: str = None, featureCode: str = None) -> "BillingService":
"""
Factory function to get or create a BillingService instance.
Args:
currentUser: Current user object
mandateId: Mandate ID for context
featureInstanceId: Optional feature instance ID
featureCode: Optional feature code (e.g., 'chatplayground', 'automation')
Returns:
BillingService instance
"""
cacheKey = f"{currentUser.id}_{mandateId}_{featureInstanceId}"
if cacheKey not in _billingServices:
_billingServices[cacheKey] = BillingService(currentUser, mandateId, featureInstanceId, featureCode)
else:
_billingServices[cacheKey].setContext(currentUser, mandateId, featureInstanceId, featureCode)
return _billingServices[cacheKey]
class BillingService:
"""
Central billing service for AI operations.
Responsibilities:
- Check balance before operations
- Record usage costs
- Apply pricing markup
- Check provider permissions via RBAC
"""
def __init__(
self,
currentUser: User,
mandateId: str,
featureInstanceId: str = None,
featureCode: str = None
):
"""
Initialize the billing service.
Args:
currentUser: Current user object
mandateId: Mandate ID
featureInstanceId: Optional feature instance ID
featureCode: Optional feature code
"""
self.currentUser = currentUser
self.mandateId = mandateId
self.featureInstanceId = featureInstanceId
self.featureCode = featureCode
# Get billing interface
self._billingInterface = getBillingInterface(currentUser, mandateId)
# Cache settings
self._settingsCache = None
def setContext(
self,
currentUser: User,
mandateId: str,
featureInstanceId: str = None,
featureCode: str = None
):
"""Update service context."""
self.currentUser = currentUser
self.mandateId = mandateId
self.featureInstanceId = featureInstanceId
self.featureCode = featureCode
self._billingInterface = getBillingInterface(currentUser, mandateId)
self._settingsCache = None
def _getSettings(self) -> Optional[Dict[str, Any]]:
"""Get billing settings with caching."""
if self._settingsCache is None:
self._settingsCache = self._billingInterface.getSettings(self.mandateId)
return self._settingsCache
# =========================================================================
# Price Calculation
# =========================================================================
def calculatePriceWithMarkup(self, basePriceCHF: float) -> float:
"""
Calculate final price with markup.
The AICore plugins return prices in their original currency (USD).
This method applies the configured markup percentage.
Args:
basePriceCHF: Base price from AI model (actually USD from provider)
Returns:
Final price in CHF with markup applied
"""
if basePriceCHF <= 0:
return 0.0
# Apply markup (50% = multiply by 1.5)
markup_multiplier = 1 + (BILLING_MARKUP_PERCENT / 100)
return round(basePriceCHF * markup_multiplier, 6)
# =========================================================================
# Balance Operations
# =========================================================================
def checkBalance(self, estimatedCost: float = 0.0) -> BillingCheckResult:
"""
Check if the current user/mandate has sufficient balance.
Args:
estimatedCost: Estimated cost of the operation (with markup applied)
Returns:
BillingCheckResult indicating if operation is allowed
"""
return self._billingInterface.checkBalance(
self.mandateId,
self.currentUser.id,
estimatedCost
)
def hasBalance(self, estimatedCost: float = 0.0) -> bool:
"""
Quick check if balance is sufficient.
Args:
estimatedCost: Estimated cost with markup
Returns:
True if operation is allowed
"""
result = self.checkBalance(estimatedCost)
return result.allowed
def getCurrentBalance(self) -> float:
"""
Get current balance for the user/mandate.
Returns:
Current balance in CHF
"""
result = self.checkBalance(0.0)
return result.currentBalance or 0.0
# =========================================================================
# Usage Recording
# =========================================================================
def recordUsage(
self,
priceCHF: float,
workflowId: str = None,
aicoreProvider: str = None,
description: str = None
) -> Optional[Dict[str, Any]]:
"""
Record AI usage cost as a billing transaction.
This method:
1. Applies the pricing markup
2. Creates a DEBIT transaction
3. Updates the account balance
Args:
priceCHF: Base price from AI model (before markup)
workflowId: Optional workflow ID
aicoreProvider: AICore provider name (e.g., 'anthropic', 'openai')
description: Optional description
Returns:
Created transaction dict or None if not recorded
"""
if priceCHF <= 0:
return None
# Apply markup
finalPrice = self.calculatePriceWithMarkup(priceCHF)
if finalPrice <= 0:
return None
# Build description
if not description:
description = f"AI Usage: {aicoreProvider or 'unknown'}"
return self._billingInterface.recordUsage(
mandateId=self.mandateId,
userId=self.currentUser.id,
priceCHF=finalPrice,
workflowId=workflowId,
featureInstanceId=self.featureInstanceId,
featureCode=self.featureCode,
aicoreProvider=aicoreProvider,
description=description
)
# =========================================================================
# Provider Permission Check (via RBAC)
# =========================================================================
def isProviderAllowed(self, provider: str) -> bool:
"""
Check if the user has permission to use an AICore provider.
Uses RBAC to check for resource permission:
resource.aicore.{provider}
Args:
provider: Provider name (e.g., 'anthropic', 'openai')
Returns:
True if provider is allowed
"""
try:
from modules.security.rbac import RbacClass
from modules.datamodels.datamodelRbac import AccessRuleContext
from modules.connectors.connectorDbPostgre import DatabaseConnector
from modules.shared.configuration import APP_CONFIG
# Get database connectors
dbApp = DatabaseConnector(
databaseName="poweron_app",
host=APP_CONFIG.get('Database_Host', 'localhost'),
port=int(APP_CONFIG.get('Database_Port', '5432')),
user=APP_CONFIG.get('Database_User', 'admin'),
password=APP_CONFIG.get('Database_Password', 'admin')
)
rbac = RbacClass(dbApp, dbApp)
resourceKey = f"resource.aicore.{provider}"
# Check if user has view permission for this resource (view = use for RESOURCE context)
permissions = rbac.getUserPermissions(
self.currentUser,
AccessRuleContext.RESOURCE,
resourceKey,
mandateId=self.mandateId
)
return permissions.view
except Exception as e:
logger.warning(f"Error checking provider permission: {e}")
# Default to allowed if RBAC check fails
return True
def getallowedProviders(self) -> List[str]:
"""
Get list of AICore providers the user is allowed to use.
Returns:
List of allowed provider names
"""
try:
from modules.aicore.aicoreModelRegistry import modelRegistry
# Get all available providers
connectors = modelRegistry.discoverConnectors()
allProviders = [c.getConnectorType() for c in connectors]
# Filter by RBAC permissions
return [p for p in allProviders if self.isProviderAllowed(p)]
except Exception as e:
logger.warning(f"Error getting allowed providers: {e}")
return []
# =========================================================================
# Admin Operations
# =========================================================================
def addCredit(
self,
amount: float,
description: str = "Manual credit",
referenceType: ReferenceTypeEnum = ReferenceTypeEnum.ADMIN
) -> Optional[Dict[str, Any]]:
"""
Add credit to the account (admin operation).
Args:
amount: Amount to credit (positive)
description: Transaction description
referenceType: Reference type (ADMIN, PAYMENT, SYSTEM)
Returns:
Created transaction dict or None
"""
if amount <= 0:
return None
settings = self._getSettings()
if not settings:
logger.warning(f"No billing settings for mandate {self.mandateId}")
return None
billingModel = BillingModelEnum(settings.get("billingModel", BillingModelEnum.UNLIMITED.value))
# Get or create account
if billingModel == BillingModelEnum.PREPAY_USER:
account = self._billingInterface.getOrCreateUserAccount(
self.mandateId,
self.currentUser.id,
initialBalance=0.0
)
else:
account = self._billingInterface.getOrCreateMandateAccount(
self.mandateId,
initialBalance=0.0
)
# Create credit transaction
transaction = BillingTransaction(
accountId=account["id"],
transactionType=TransactionTypeEnum.CREDIT,
amount=amount,
description=description,
referenceType=referenceType
)
return self._billingInterface.createTransaction(transaction)
# =========================================================================
# Statistics & Reporting
# =========================================================================
def getBalancesForUser(self) -> List[BillingBalanceResponse]:
"""
Get all billing balances for the current user.
Returns:
List of balance responses for each mandate
"""
return self._billingInterface.getBalancesForUser(self.currentUser.id)
def getTransactionHistory(self, limit: int = 100) -> List[Dict[str, Any]]:
"""
Get transaction history for the current mandate.
Args:
limit: Maximum number of transactions
Returns:
List of transactions
"""
return self._billingInterface.getTransactionsByMandate(self.mandateId, limit=limit)
# ============================================================================
# Exception Classes
# ============================================================================
class InsufficientBalanceException(Exception):
"""Raised when there's insufficient balance for an operation."""
def __init__(self, currentBalance: float, requiredAmount: float, message: str = None):
self.currentBalance = currentBalance
self.requiredAmount = requiredAmount
self.message = message or f"Insufficient balance. Current: {currentBalance:.2f} CHF, Required: {requiredAmount:.2f} CHF"
super().__init__(self.message)
class ProviderNotAllowedException(Exception):
"""Raised when a user doesn't have permission to use an AI provider."""
def __init__(self, provider: str, message: str = None):
self.provider = provider
self.message = message or f"Provider '{provider}' is not allowed for your role"
super().__init__(self.message)

View file

@ -674,7 +674,8 @@ class ChatService:
return chatLog
def storeWorkflowStat(self, workflow: Any, aiResponse: Any, process: str) -> ChatStat:
"""Persist workflow-level ChatStat from AiCallResponse and append to workflow stats list."""
"""Persist workflow-level ChatStat from AiCallResponse and append to workflow stats list.
Also records the usage cost to the billing system if configured."""
try:
# Create ChatStat from AiCallResponse data
statData = {
@ -696,10 +697,69 @@ class ChatService:
workflow.stats = []
workflow.stats.append(stat)
# Record billing transaction if mandateId is available
self._recordBillingUsage(workflow, aiResponse, process)
return stat
except Exception as e:
logger.error(f"Failed to store workflow stat: {e}")
raise
def _recordBillingUsage(self, workflow: Any, aiResponse: Any, process: str) -> None:
"""Record AI usage to the billing system.
This method:
1. Gets the mandate context from services
2. Records the usage cost with 50% markup via BillingService
Args:
workflow: ChatWorkflow object
aiResponse: AI call response with cost information
process: Process identifier for the AI call
"""
try:
# Check if we have mandate context
mandateId = getattr(self.services, 'mandateId', None)
if not mandateId:
logger.debug("No mandate context, skipping billing recording")
return
# Check if there's a cost to record
priceCHF = getattr(aiResponse, 'priceCHF', 0.0)
if not priceCHF or priceCHF <= 0:
return
# Extract provider from model name (e.g., "anthropic.claude-3-sonnet" -> "anthropic")
modelName = getattr(aiResponse, 'modelName', '') or ''
aicoreProvider = modelName.split('.')[0] if '.' in modelName else 'unknown'
# Get feature context if available
featureInstanceId = getattr(self.services, 'featureInstanceId', None)
featureCode = getattr(self.services, 'featureCode', None)
# Import and use BillingService
from modules.services.serviceBilling.mainServiceBilling import getService as getBillingService
billingService = getBillingService(
self.user,
mandateId,
featureInstanceId=featureInstanceId,
featureCode=featureCode
)
# Record the usage (includes 50% markup)
billingService.recordUsage(
priceCHF=priceCHF,
workflowId=workflow.id,
aicoreProvider=aicoreProvider,
description=f"AI Usage: {process}"
)
logger.debug(f"Recorded billing usage: {priceCHF} CHF for {process} (provider: {aicoreProvider})")
except Exception as e:
# Don't fail the main operation if billing recording fails
logger.warning(f"Failed to record billing usage (non-critical): {e}")
def updateMessage(self, messageId: str, messageData: Dict[str, Any]):
"""Update message by delegating to the chat interface"""

View file

@ -91,6 +91,29 @@ NAVIGATION_SECTIONS = [
},
],
},
{
"id": "billing",
"title": {"en": "BILLING", "de": "BILLING", "fr": "FACTURATION"},
"order": 35,
"items": [
{
"id": "billing-dashboard",
"objectKey": "ui.billing.dashboard",
"label": {"en": "Balance", "de": "Guthaben", "fr": "Solde"},
"icon": "FaWallet",
"path": "/billing",
"order": 10,
},
{
"id": "billing-transactions",
"objectKey": "ui.billing.transactions",
"label": {"en": "Transactions", "de": "Transaktionen", "fr": "Transactions"},
"icon": "FaListAlt",
"path": "/billing/transactions",
"order": 20,
},
],
},
{
"id": "admin",
"title": {"en": "ADMINISTRATION", "de": "ADMINISTRATION", "fr": "ADMINISTRATION"},
@ -178,6 +201,15 @@ NAVIGATION_SECTIONS = [
"order": 50,
"adminOnly": True,
},
{
"id": "admin-billing",
"objectKey": "ui.admin.billing",
"label": {"en": "Billing Administration", "de": "Billing-Verwaltung", "fr": "Administration de facturation"},
"icon": "FaMoneyBillAlt",
"path": "/admin/billing",
"order": 60,
"adminOnly": True,
},
],
},
]