gateway/modules/routes/routeBilling.py
patrick-motsch 3777839a5c feat(billing): scope parameter on /view/statistics endpoint
- New query parameter scope: personal/mandate/all
- personal: filters to only current user's transactions (ignores admin role)
- mandate: filters by mandateId parameter
- all: existing RBAC-filtered behavior (default)

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-02-15 10:44:26 +01:00

1303 lines
49 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Billing routes for the backend API.
Implements the endpoints for billing management and usage tracking.
Features:
- User endpoints: View balance, transactions, statistics
- Admin endpoints: Manage settings, add credits, view all accounts
"""
from fastapi import APIRouter, HTTPException, Depends, Body, Path, Request, Response, Query
from typing import List, Dict, Any, Optional
from fastapi import status
import logging
from datetime import date, datetime
from pydantic import BaseModel, Field
# Import auth module
from modules.auth import limiter, requireSysAdminRole, getRequestContext, RequestContext
# Import billing components
from modules.interfaces.interfaceDbBilling import getInterface as getBillingInterface
from modules.services.serviceBilling.mainServiceBilling import getService as getBillingService
from modules.datamodels.datamodelPagination import PaginationParams, PaginatedResponse, PaginationMetadata, normalize_pagination_dict
from modules.routes.routeDataUsers import _applyFiltersAndSort
from modules.datamodels.datamodelBilling import (
BillingAccount,
BillingTransaction,
BillingSettings,
BillingAddress,
BillingModelEnum,
TransactionTypeEnum,
ReferenceTypeEnum,
PeriodTypeEnum,
BillingBalanceResponse,
BillingStatisticsResponse,
BillingStatisticsChartData,
BillingCheckResult,
)
# Configure logger
logger = logging.getLogger(__name__)
# =============================================================================
# Billing RBAC Data Scope
# =============================================================================
#
# RBAC rules for billing data visibility:
#
# SysAdmin → ALL transactions and statistics across all mandates
# Mandate-Admin → ALL user data within their administrated mandates
# Feature-Instance-Admin→ Data for their administrated feature instances
# Regular User → ONLY their own data within their mandates
#
class BillingDataScope:
"""
Determines what billing data a user can see based on RBAC roles.
Evaluated once per request and used to filter transactions/statistics.
"""
__slots__ = ('isGlobalAdmin', 'adminMandateIds', 'adminFeatureInstanceIds',
'memberMandateIds', 'userId')
def __init__(self, userId: str):
self.isGlobalAdmin: bool = False
self.adminMandateIds: list = []
self.adminFeatureInstanceIds: list = []
self.memberMandateIds: list = []
self.userId: str = userId
def _getBillingDataScope(user) -> BillingDataScope:
"""
Determine what billing data a user can see based on RBAC.
Uses rootInterface (privileged) to check roles across all mandates
and feature instances without RBAC restrictions on the lookup itself.
Returns:
BillingDataScope with the user's visibility boundaries.
"""
scope = BillingDataScope(userId=user.id)
from modules.auth.authentication import _hasSysAdminRole
if _hasSysAdminRole(str(user.id)):
scope.isGlobalAdmin = True
return scope
from modules.interfaces.interfaceDbApp import getRootInterface
rootInterface = getRootInterface()
# --- Mandate roles ---
userMandates = rootInterface.getUserMandates(user.id)
for um in userMandates:
mandateId = getattr(um, 'mandateId', None)
umId = getattr(um, 'id', None)
if not mandateId or not umId:
continue
roleIds = rootInterface.getRoleIdsForUserMandate(umId)
isAdmin = False
for roleId in roleIds:
role = rootInterface.getRole(roleId)
if role and role.roleLabel == "admin" and not role.featureInstanceId:
isAdmin = True
break
if isAdmin:
scope.adminMandateIds.append(mandateId)
else:
scope.memberMandateIds.append(mandateId)
# --- Feature instance roles ---
featureAccesses = rootInterface.getFeatureAccessesForUser(user.id)
for fa in featureAccesses:
fiId = getattr(fa, 'featureInstanceId', None)
faId = getattr(fa, 'id', None)
if not fiId or not faId:
continue
roleIds = rootInterface.getRoleIdsForFeatureAccess(faId)
for roleId in roleIds:
role = rootInterface.getRole(roleId)
if role and role.roleLabel == "admin":
scope.adminFeatureInstanceIds.append(fiId)
break
logger.debug(
f"BillingDataScope for user {user.id}: "
f"globalAdmin={scope.isGlobalAdmin}, "
f"adminMandates={scope.adminMandateIds}, "
f"adminInstances={scope.adminFeatureInstanceIds}, "
f"memberMandates={scope.memberMandateIds}"
)
return scope
def _isAdminOfMandate(ctx: RequestContext, targetMandateId: str) -> bool:
"""Check if user is SysAdmin or admin of the specified mandate."""
if ctx.hasSysAdminRole:
return True
try:
from modules.interfaces.interfaceDbApp import getRootInterface
rootInterface = getRootInterface()
userMandates = rootInterface.getUserMandates(str(ctx.user.id))
for um in userMandates:
if str(getattr(um, 'mandateId', None)) != str(targetMandateId):
continue
if not getattr(um, 'enabled', True):
continue
umId = str(getattr(um, 'id', ''))
roleIds = rootInterface.getRoleIdsForUserMandate(umId)
for roleId in roleIds:
role = rootInterface.getRole(roleId)
if role and role.roleLabel == "admin" and not role.featureInstanceId:
return True
return False
except Exception:
return False
def _filterTransactionsByScope(transactions: list, scope: BillingDataScope) -> list:
"""
Filter a list of transaction dicts based on the user's BillingDataScope.
Rules:
- SysAdmin: no filter
- Mandate-Admin: all transactions in their admin mandates
- Feature-Instance-Admin: transactions for their admin feature instances
- Regular user: only transactions where createdByUserId/userId matches
"""
if scope.isGlobalAdmin:
return transactions
adminMandateSet = set(scope.adminMandateIds)
adminFiSet = set(scope.adminFeatureInstanceIds)
memberMandateSet = set(scope.memberMandateIds)
result = []
for t in transactions:
mandateId = t.get("mandateId")
fiId = t.get("featureInstanceId")
txUserId = t.get("createdByUserId") or t.get("userId")
# Mandate admin → sees all transactions in their mandate
if mandateId and mandateId in adminMandateSet:
result.append(t)
continue
# Feature instance admin → sees all transactions for their instances
if fiId and fiId in adminFiSet:
result.append(t)
continue
# Regular member → only own transactions
if mandateId and mandateId in memberMandateSet:
if txUserId and txUserId == scope.userId:
result.append(t)
continue
return result
# =============================================================================
# Request/Response Models
# =============================================================================
class CreditAddRequest(BaseModel):
"""Request model for adding credit to an account."""
userId: Optional[str] = Field(None, description="Target user ID (for PREPAY_USER model)")
amount: float = Field(..., gt=0, description="Amount to credit in CHF")
description: str = Field(default="Manual credit", description="Transaction description")
class BillingSettingsUpdate(BaseModel):
"""Request model for updating billing settings."""
billingModel: Optional[BillingModelEnum] = None
defaultUserCredit: Optional[float] = Field(None, ge=0)
warningThresholdPercent: Optional[float] = Field(None, ge=0, le=100)
blockOnZeroBalance: Optional[bool] = None
notifyOnWarning: Optional[bool] = None
notifyEmails: Optional[List[str]] = None
billingAddress: Optional[BillingAddress] = None
class TransactionResponse(BaseModel):
"""Response model for a billing transaction."""
id: str
accountId: str
transactionType: TransactionTypeEnum
amount: float
description: str
referenceType: Optional[ReferenceTypeEnum]
workflowId: Optional[str]
featureCode: Optional[str]
featureInstanceId: Optional[str] = None
aicoreProvider: Optional[str]
aicoreModel: Optional[str] = None
createdByUserId: Optional[str] = None
createdAt: Optional[datetime]
mandateId: Optional[str] = None
mandateName: Optional[str] = None
class AccountSummary(BaseModel):
"""Summary of a billing account."""
id: str
mandateId: str
userId: Optional[str]
accountType: str
balance: float
creditLimit: Optional[float]
warningThreshold: float
enabled: bool
class UsageReportResponse(BaseModel):
"""Usage report for a period."""
period: str
totalCost: float
transactionCount: int
costByProvider: Dict[str, float]
costByModel: Dict[str, float] = {}
costByFeature: Dict[str, float]
# =============================================================================
# Response Models for Mandate/User Views
# =============================================================================
class MandateBalanceResponse(BaseModel):
"""Mandate-level balance summary."""
mandateId: str
mandateName: str
billingModel: str
totalBalance: float
userCount: int
defaultUserCredit: float
warningThresholdPercent: float
blockOnZeroBalance: bool
class UserBalanceResponse(BaseModel):
"""User-level balance summary."""
accountId: str
mandateId: str
mandateName: str
userId: str
userName: str
balance: float
warningThreshold: float
isWarning: bool
enabled: bool
class UserTransactionResponse(BaseModel):
"""User-level transaction with user context."""
id: str
accountId: str
transactionType: TransactionTypeEnum
amount: float
description: str
referenceType: Optional[ReferenceTypeEnum]
workflowId: Optional[str]
featureCode: Optional[str]
featureInstanceId: Optional[str] = None
aicoreProvider: Optional[str]
aicoreModel: Optional[str] = None
createdByUserId: Optional[str] = None
createdAt: Optional[datetime]
mandateId: Optional[str] = None
mandateName: Optional[str] = None
userId: Optional[str] = None
userName: Optional[str] = None
# =============================================================================
# Router Setup
# =============================================================================
router = APIRouter(
prefix="/api/billing",
tags=["Billing"],
responses={404: {"description": "Not found"}}
)
# =============================================================================
# User Endpoints
# =============================================================================
@router.get("/balance", response_model=List[BillingBalanceResponse])
@limiter.limit("60/minute")
def getBalance(
request: Request,
ctx: RequestContext = Depends(getRequestContext)
):
"""
Get billing balances for all mandates the current user belongs to.
Returns balance information for each mandate.
"""
try:
billingService = getBillingService(
ctx.user,
ctx.mandateId,
featureCode="billing"
)
balances = billingService.getBalancesForUser()
return balances
except Exception as e:
logger.error(f"Error getting billing balance: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/balance/{targetMandateId}", response_model=BillingBalanceResponse)
@limiter.limit("60/minute")
def getBalanceForMandate(
request: Request,
targetMandateId: str = Path(..., description="Mandate ID"),
ctx: RequestContext = Depends(getRequestContext)
):
"""
Get billing balance for a specific mandate.
"""
try:
billingService = getBillingService(
ctx.user,
targetMandateId,
featureCode="billing"
)
# Check balance
checkResult = billingService.checkBalance(0.0)
# Get mandate name from app interface
from modules.interfaces.interfaceDbApp import getInterface as getAppInterface
appInterface = getAppInterface(ctx.user, mandateId=targetMandateId)
mandate = appInterface.getMandate(targetMandateId)
mandateName = (mandate.get("label") or mandate.get("name", "")) if mandate else ""
return BillingBalanceResponse(
mandateId=targetMandateId,
mandateName=mandateName,
billingModel=checkResult.billingModel or BillingModelEnum.UNLIMITED,
balance=checkResult.currentBalance or 0.0,
warningThreshold=0.0, # TODO: Get from account
isWarning=False,
creditLimit=None
)
except Exception as e:
logger.error(f"Error getting billing balance for mandate {targetMandateId}: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/transactions", response_model=List[TransactionResponse])
@limiter.limit("30/minute")
def getTransactions(
request: Request,
limit: int = Query(default=50, ge=1, le=500),
offset: int = Query(default=0, ge=0),
ctx: RequestContext = Depends(getRequestContext)
):
"""
Get transaction history across all mandates the user belongs to.
"""
try:
billingService = getBillingService(
ctx.user,
ctx.mandateId,
featureCode="billing"
)
# Fetch enough transactions for pagination
transactions = billingService.getTransactionHistory(limit=offset + limit)
# Convert to response model
result = []
for t in transactions[offset:offset + limit]:
result.append(TransactionResponse(
id=t.get("id"),
accountId=t.get("accountId"),
transactionType=TransactionTypeEnum(t.get("transactionType", "DEBIT")),
amount=t.get("amount", 0.0),
description=t.get("description", ""),
referenceType=ReferenceTypeEnum(t["referenceType"]) if t.get("referenceType") else None,
workflowId=t.get("workflowId"),
featureCode=t.get("featureCode"),
featureInstanceId=t.get("featureInstanceId"),
aicoreProvider=t.get("aicoreProvider"),
aicoreModel=t.get("aicoreModel"),
createdByUserId=t.get("createdByUserId"),
createdAt=t.get("_createdAt"),
mandateId=t.get("mandateId"),
mandateName=t.get("mandateName")
))
return result
except Exception as e:
logger.error(f"Error getting billing transactions: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/statistics/{period}", response_model=UsageReportResponse)
@limiter.limit("30/minute")
def getStatistics(
request: Request,
period: str = Path(..., description="Period: 'day', 'month', or 'year'"),
year: int = Query(..., description="Year"),
month: Optional[int] = Query(None, description="Month (1-12, required for 'day' period)"),
ctx: RequestContext = Depends(getRequestContext)
):
"""
Get usage statistics for a period.
"""
try:
# Validate period
if period not in ["day", "month", "year"]:
raise HTTPException(status_code=400, detail="Invalid period. Use 'day', 'month', or 'year'")
if period == "day" and not month:
raise HTTPException(status_code=400, detail="Month is required for 'day' period")
billingInterface = getBillingInterface(ctx.user, ctx.mandateId)
settings = billingInterface.getSettings(ctx.mandateId)
if not settings:
return UsageReportResponse(
period=period,
totalCost=0.0,
transactionCount=0,
costByProvider={},
costByFeature={}
)
billingModel = BillingModelEnum(settings.get("billingModel", BillingModelEnum.UNLIMITED.value))
# Transactions are always on user accounts (audit trail)
account = billingInterface.getUserAccount(ctx.mandateId, ctx.user.id)
if not account:
return UsageReportResponse(
period=period,
totalCost=0.0,
transactionCount=0,
costByProvider={},
costByFeature={}
)
# Calculate date range
if period == "day":
startDate = date(year, month, 1)
if month == 12:
endDate = date(year + 1, 1, 1)
else:
endDate = date(year, month + 1, 1)
elif period == "month":
startDate = date(year, 1, 1)
endDate = date(year + 1, 1, 1)
else: # year
startDate = date(year, 1, 1)
endDate = date(year + 1, 1, 1)
# Get statistics from transactions
stats = billingInterface.calculateStatisticsFromTransactions(
account["id"],
startDate,
endDate
)
return UsageReportResponse(
period=period,
totalCost=stats.get("totalCostCHF", 0.0),
transactionCount=stats.get("transactionCount", 0),
costByProvider=stats.get("costByProvider", {}),
costByModel=stats.get("costByModel", {}),
costByFeature=stats.get("costByFeature", {})
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting billing statistics: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/providers", response_model=List[str])
@limiter.limit("60/minute")
def getAllowedProviders(
request: Request,
ctx: RequestContext = Depends(getRequestContext)
):
"""
Get list of AICore providers the current user is allowed to use.
"""
try:
billingService = getBillingService(
ctx.user,
ctx.mandateId,
featureCode="billing"
)
return billingService.getallowedProviders()
except Exception as e:
logger.error(f"Error getting allowed providers: {e}")
raise HTTPException(status_code=500, detail=str(e))
# =============================================================================
# Admin Endpoints
# =============================================================================
@router.get("/admin/settings/{targetMandateId}", response_model=Dict[str, Any])
@limiter.limit("30/minute")
def getSettingsAdmin(
request: Request,
targetMandateId: str = Path(..., description="Mandate ID"),
ctx: RequestContext = Depends(getRequestContext),
):
"""
Get billing settings for a mandate.
Access: SysAdmin (any mandate) or MandateAdmin (own mandate).
"""
if not _isAdminOfMandate(ctx, targetMandateId):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin role required for this mandate")
try:
billingInterface = getBillingInterface(ctx.user, targetMandateId)
settings = billingInterface.getSettings(targetMandateId)
if not settings:
raise HTTPException(status_code=404, detail="Billing settings not found")
return settings
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting billing settings: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/admin/settings/{targetMandateId}", response_model=Dict[str, Any])
@limiter.limit("10/minute")
def createOrUpdateSettings(
request: Request,
targetMandateId: str = Path(..., description="Mandate ID"),
settingsUpdate: BillingSettingsUpdate = Body(...),
ctx: RequestContext = Depends(getRequestContext),
_admin = Depends(requireSysAdminRole)
):
"""
Create or update billing settings for a mandate (SysAdmin only).
"""
try:
billingInterface = getBillingInterface(ctx.user, targetMandateId)
existingSettings = billingInterface.getSettings(targetMandateId)
if existingSettings:
updates = settingsUpdate.model_dump(exclude_none=True)
if updates:
# Check if billing model is changing - trigger budget migration
if "billingModel" in updates:
oldModel = BillingModelEnum(existingSettings.get("billingModel", BillingModelEnum.UNLIMITED.value))
newModel = BillingModelEnum(updates["billingModel"]) if isinstance(updates["billingModel"], str) else updates["billingModel"]
if oldModel != newModel:
migrationResult = billingInterface.switchBillingModel(targetMandateId, oldModel, newModel)
logger.info(f"Billing model migration for {targetMandateId}: {migrationResult}")
result = billingInterface.updateSettings(existingSettings["id"], updates)
return result or existingSettings
return existingSettings
else:
from modules.datamodels.datamodelBilling import BillingSettings
newSettings = BillingSettings(
mandateId=targetMandateId,
billingModel=settingsUpdate.billingModel or BillingModelEnum.UNLIMITED,
defaultUserCredit=settingsUpdate.defaultUserCredit or 10.0,
warningThresholdPercent=settingsUpdate.warningThresholdPercent or 10.0,
blockOnZeroBalance=settingsUpdate.blockOnZeroBalance if settingsUpdate.blockOnZeroBalance is not None else True,
notifyOnWarning=settingsUpdate.notifyOnWarning if settingsUpdate.notifyOnWarning is not None else True,
notifyEmails=settingsUpdate.notifyEmails or [],
billingAddress=settingsUpdate.billingAddress
)
return billingInterface.createSettings(newSettings)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error updating billing settings: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/admin/credit/{targetMandateId}", response_model=Dict[str, Any])
@limiter.limit("10/minute")
def addCredit(
request: Request,
targetMandateId: str = Path(..., description="Mandate ID"),
creditRequest: CreditAddRequest = Body(...),
ctx: RequestContext = Depends(getRequestContext),
_admin = Depends(requireSysAdminRole)
):
"""
Add credit to a billing account (SysAdmin only).
For PREPAY_USER model, specify userId. For PREPAY_MANDATE, leave userId empty.
"""
try:
# Get settings to determine billing model
billingInterface = getBillingInterface(ctx.user, targetMandateId)
settings = billingInterface.getSettings(targetMandateId)
if not settings:
raise HTTPException(status_code=404, detail="Billing settings not found for this mandate")
billingModel = BillingModelEnum(settings.get("billingModel", BillingModelEnum.UNLIMITED.value))
# Validate request based on billing model
if billingModel == BillingModelEnum.PREPAY_USER:
if not creditRequest.userId:
raise HTTPException(status_code=400, detail="userId is required for PREPAY_USER model")
# Create user-level account if needed and add credit
account = billingInterface.getOrCreateUserAccount(
targetMandateId,
creditRequest.userId,
initialBalance=0.0
)
elif billingModel in [BillingModelEnum.PREPAY_MANDATE, BillingModelEnum.CREDIT_POSTPAY]:
# Create mandate-level account if needed and add credit
account = billingInterface.getOrCreateMandateAccount(targetMandateId, initialBalance=0.0)
else:
raise HTTPException(status_code=400, detail=f"Cannot add credit to {billingModel.value} billing model")
# Create credit transaction
from modules.datamodels.datamodelBilling import BillingTransaction
transaction = BillingTransaction(
accountId=account["id"],
transactionType=TransactionTypeEnum.CREDIT,
amount=creditRequest.amount,
description=creditRequest.description,
referenceType=ReferenceTypeEnum.ADMIN
)
result = billingInterface.createTransaction(transaction)
logger.info(f"Added {creditRequest.amount} CHF credit to account {account['id']} in mandate {targetMandateId}")
return result
except HTTPException:
raise
except Exception as e:
logger.error(f"Error adding credit: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/admin/accounts/{targetMandateId}", response_model=List[AccountSummary])
@limiter.limit("30/minute")
def getAccounts(
request: Request,
targetMandateId: str = Path(..., description="Mandate ID"),
ctx: RequestContext = Depends(getRequestContext),
):
"""
Get all billing accounts for a mandate.
Access: SysAdmin (any mandate) or MandateAdmin (own mandate).
"""
if not _isAdminOfMandate(ctx, targetMandateId):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin role required for this mandate")
try:
billingInterface = getBillingInterface(ctx.user, targetMandateId)
# Get all accounts for this mandate via interface
accounts = billingInterface.getAccountsByMandate(targetMandateId)
result = []
for acc in accounts:
result.append(AccountSummary(
id=acc.get("id"),
mandateId=acc.get("mandateId"),
userId=acc.get("userId"),
accountType=acc.get("accountType"),
balance=acc.get("balance", 0.0),
creditLimit=acc.get("creditLimit"),
warningThreshold=acc.get("warningThreshold", 0.0),
enabled=acc.get("enabled", True)
))
return result
except Exception as e:
logger.error(f"Error getting billing accounts: {e}")
raise HTTPException(status_code=500, detail=str(e))
class MandateUserSummary(BaseModel):
"""Summary of a user for billing admin purposes."""
id: str
username: Optional[str] = None
email: Optional[str] = None
firstName: Optional[str] = None
lastName: Optional[str] = None
displayName: Optional[str] = None
@router.get("/admin/users/{targetMandateId}", response_model=List[MandateUserSummary])
@limiter.limit("30/minute")
def getUsersForMandate(
request: Request,
targetMandateId: str = Path(..., description="Mandate ID"),
ctx: RequestContext = Depends(getRequestContext),
):
"""
Get all users belonging to a mandate.
Access: SysAdmin (any mandate) or MandateAdmin (own mandate).
Used by billing admin to select users for credit assignment.
"""
if not _isAdminOfMandate(ctx, targetMandateId):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin role required for this mandate")
try:
from modules.interfaces.interfaceDbApp import getInterface as getAppInterface
appInterface = getAppInterface(ctx.user, mandateId=targetMandateId)
userMandates = appInterface.getUserMandatesByMandate(targetMandateId)
result = []
for um in userMandates:
userId = um.get("userId") if isinstance(um, dict) else getattr(um, "userId", None)
if not userId:
continue
user = appInterface.getUser(userId)
if not user:
continue
# Handle both Pydantic models and dicts
if isinstance(user, dict):
username = user.get("username", "")
firstName = user.get("firstName", "")
lastName = user.get("lastName", "")
email = user.get("email", "")
else:
username = getattr(user, "username", "") or ""
firstName = getattr(user, "firstName", "") or ""
lastName = getattr(user, "lastName", "") or ""
email = getattr(user, "email", "") or ""
displayName = f"{firstName} {lastName}".strip() or username or userId
result.append(MandateUserSummary(
id=userId,
username=username,
email=email,
firstName=firstName,
lastName=lastName,
displayName=displayName
))
return result
except Exception as e:
logger.error(f"Error getting users for mandate {targetMandateId}: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/admin/transactions/{targetMandateId}", response_model=List[TransactionResponse])
@limiter.limit("30/minute")
def getTransactionsAdmin(
request: Request,
targetMandateId: str = Path(..., description="Mandate ID"),
limit: int = Query(default=100, ge=1, le=1000),
ctx: RequestContext = Depends(getRequestContext),
):
"""
Get all transactions for a mandate.
Access: SysAdmin (any mandate) or MandateAdmin (own mandate).
"""
if not _isAdminOfMandate(ctx, targetMandateId):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin role required for this mandate")
try:
billingInterface = getBillingInterface(ctx.user, targetMandateId)
transactions = billingInterface.getTransactionsByMandate(targetMandateId, limit=limit)
result = []
for t in transactions:
result.append(TransactionResponse(
id=t.get("id"),
accountId=t.get("accountId"),
transactionType=TransactionTypeEnum(t.get("transactionType", "DEBIT")),
amount=t.get("amount", 0.0),
description=t.get("description", ""),
referenceType=ReferenceTypeEnum(t["referenceType"]) if t.get("referenceType") else None,
workflowId=t.get("workflowId"),
featureCode=t.get("featureCode"),
featureInstanceId=t.get("featureInstanceId"),
aicoreProvider=t.get("aicoreProvider"),
aicoreModel=t.get("aicoreModel"),
createdByUserId=t.get("createdByUserId"),
createdAt=t.get("_createdAt")
))
return result
except Exception as e:
logger.error(f"Error getting billing transactions for mandate {targetMandateId}: {e}")
raise HTTPException(status_code=500, detail=str(e))
# =============================================================================
# Mandate View Endpoints (for Admins)
# =============================================================================
@router.get("/view/mandates/balances", response_model=List[MandateBalanceResponse])
@limiter.limit("30/minute")
def getMandateViewBalances(
request: Request,
ctx: RequestContext = Depends(getRequestContext),
_admin = Depends(requireSysAdminRole)
):
"""
Get mandate-level balances (SysAdmin only).
Shows aggregated balances per mandate.
"""
try:
billingInterface = getBillingInterface(ctx.user, ctx.mandateId)
balances = billingInterface.getMandateBalances()
return [MandateBalanceResponse(**b) for b in balances]
except Exception as e:
logger.error(f"Error getting mandate view balances: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/view/mandates/transactions", response_model=List[TransactionResponse])
@limiter.limit("30/minute")
def getMandateViewTransactions(
request: Request,
limit: int = Query(default=100, ge=1, le=1000),
ctx: RequestContext = Depends(getRequestContext),
_admin = Depends(requireSysAdminRole)
):
"""
Get all transactions across mandates (SysAdmin only).
"""
try:
billingInterface = getBillingInterface(ctx.user, ctx.mandateId)
transactions = billingInterface.getMandateTransactions(limit=limit)
result = []
for t in transactions:
result.append(TransactionResponse(
id=t.get("id"),
accountId=t.get("accountId"),
transactionType=TransactionTypeEnum(t.get("transactionType", "DEBIT")),
amount=t.get("amount", 0.0),
description=t.get("description", ""),
referenceType=ReferenceTypeEnum(t["referenceType"]) if t.get("referenceType") else None,
workflowId=t.get("workflowId"),
featureCode=t.get("featureCode"),
featureInstanceId=t.get("featureInstanceId"),
aicoreProvider=t.get("aicoreProvider"),
aicoreModel=t.get("aicoreModel"),
createdByUserId=t.get("createdByUserId"),
createdAt=t.get("_createdAt"),
mandateId=t.get("mandateId"),
mandateName=t.get("mandateName")
))
return result
except Exception as e:
logger.error(f"Error getting mandate view transactions: {e}")
raise HTTPException(status_code=500, detail=str(e))
# =============================================================================
# User View Endpoints (RBAC-based)
# =============================================================================
@router.get("/view/users/balances", response_model=List[UserBalanceResponse])
@limiter.limit("30/minute")
def getUserViewBalances(
request: Request,
ctx: RequestContext = Depends(getRequestContext)
):
"""
Get user-level balances.
RBAC filtering:
- SysAdmin: sees all user balances across all mandates
- Mandate-Admin: sees user balances for mandates they administrate
- Regular user: sees only their own balances
"""
try:
billingInterface = getBillingInterface(ctx.user, ctx.mandateId)
# Evaluate RBAC scope
scope = _getBillingDataScope(ctx.user)
# Determine mandate IDs for data loading
if scope.isGlobalAdmin:
mandateIds = None
else:
mandateIds = scope.adminMandateIds + scope.memberMandateIds
if not mandateIds:
return []
allBalances = billingInterface.getUserBalancesForMandates(mandateIds)
# RBAC filter: mandate admins see all in their mandates, regular users only own
if not scope.isGlobalAdmin:
adminMandateSet = set(scope.adminMandateIds)
allBalances = [
b for b in allBalances
if b.get("mandateId") in adminMandateSet or b.get("userId") == scope.userId
]
return [UserBalanceResponse(**b) for b in allBalances]
except Exception as e:
logger.error(f"Error getting user view balances: {e}")
raise HTTPException(status_code=500, detail=str(e))
class ViewStatisticsResponse(BaseModel):
"""Aggregated statistics across all user's mandates."""
totalCost: float = 0.0
transactionCount: int = 0
costByProvider: Dict[str, float] = {}
costByModel: Dict[str, float] = {}
costByFeature: Dict[str, float] = {}
costByMandate: Dict[str, float] = {}
timeSeries: List[Dict[str, Any]] = []
@router.get("/view/statistics")
@limiter.limit("30/minute")
def getUserViewStatistics(
request: Request,
period: str = Query(default="month", description="Period: 'day' or 'month'"),
year: int = Query(default=None, description="Year"),
month: Optional[int] = Query(None, description="Month (1-12, required for period='day')"),
scope: str = Query(default="all", description="Scope: 'personal' (own costs only), 'mandate' (filter by mandateId), 'all' (RBAC-filtered)"),
mandateId: Optional[str] = Query(None, description="Mandate ID filter (used with scope='mandate')"),
ctx: RequestContext = Depends(getRequestContext)
) -> ViewStatisticsResponse:
"""
Get aggregated usage statistics across all user's mandates.
Scope:
- personal: only the current user's own transactions (ignores admin role)
- mandate: transactions for a specific mandate (requires mandateId parameter)
- all: RBAC-filtered (SysAdmin sees everything, admin sees mandate, user sees own)
- period='month': returns monthly time series for the given year
- period='day': returns daily time series for the given month/year
"""
try:
from datetime import timedelta
if year is None:
year = datetime.now().year
if period == "day" and not month:
month = datetime.now().month
billingInterface = getBillingInterface(ctx.user, ctx.mandateId)
# Evaluate RBAC scope
rbacScope = _getBillingDataScope(ctx.user)
# Determine mandate IDs for data loading
if rbacScope.isGlobalAdmin:
loadMandateIds = None
else:
loadMandateIds = rbacScope.adminMandateIds + rbacScope.memberMandateIds
if not loadMandateIds:
logger.warning("No mandate IDs found for user")
return ViewStatisticsResponse()
# Scope=mandate: restrict to specific mandate
if scope == "mandate" and mandateId:
loadMandateIds = [mandateId]
# Get all transactions
allTransactions = billingInterface.getUserTransactionsForMandates(loadMandateIds, limit=10000)
# Apply RBAC filter (respects admin/user roles)
allTransactions = _filterTransactionsByScope(allTransactions, rbacScope)
# Scope=personal: further filter to only own transactions
if scope == "personal":
userId = str(ctx.user.id)
allTransactions = [
t for t in allTransactions
if (t.get("createdByUserId") or t.get("userId")) == userId
]
logger.info(f"View statistics: {len(allTransactions)} RBAC-filtered transactions for period={period}, year={year}, month={month}")
# Calculate date range
if period == "day":
startDate = date(year, month, 1)
if month == 12:
endDate = date(year + 1, 1, 1)
else:
endDate = date(year, month + 1, 1)
else:
startDate = date(year, 1, 1)
endDate = date(year + 1, 1, 1)
# Filter by date range and only DEBIT transactions
debits = []
skippedNoDate = 0
skippedDateRange = 0
skippedNotDebit = 0
for t in allTransactions:
createdAt = t.get("_createdAt")
if not createdAt:
skippedNoDate += 1
continue
# Parse date from various formats (DB stores as DOUBLE PRECISION / Unix timestamp)
txDate = None
if isinstance(createdAt, (int, float)):
txDate = datetime.fromtimestamp(createdAt).date()
elif isinstance(createdAt, datetime):
txDate = createdAt.date()
elif isinstance(createdAt, date) and not isinstance(createdAt, datetime):
txDate = createdAt
elif isinstance(createdAt, str):
try:
# Try as float string first (Unix timestamp)
txDate = datetime.fromtimestamp(float(createdAt)).date()
except (ValueError, TypeError):
try:
txDate = datetime.fromisoformat(createdAt.replace("Z", "+00:00")).date()
except (ValueError, TypeError):
skippedNoDate += 1
continue
else:
skippedNoDate += 1
continue
if txDate < startDate or txDate >= endDate:
skippedDateRange += 1
continue
# Compare transactionType - handle both string and enum
txType = t.get("transactionType")
txTypeStr = str(txType) if txType is not None else ""
if txTypeStr != "DEBIT" and txTypeStr != "TransactionTypeEnum.DEBIT":
# Also check .value for enum objects
txTypeValue = getattr(txType, 'value', txTypeStr)
if txTypeValue != "DEBIT":
skippedNotDebit += 1
continue
t["_txDate"] = txDate
debits.append(t)
logger.info(f"View statistics: {len(debits)} DEBIT transactions after filter. "
f"Skipped: noDate={skippedNoDate}, dateRange={skippedDateRange}, notDebit={skippedNotDebit}")
# Aggregate totals
totalCost = sum(t.get("amount", 0) for t in debits)
costByProvider: Dict[str, float] = {}
costByModel: Dict[str, float] = {}
costByFeature: Dict[str, float] = {}
costByMandate: Dict[str, float] = {}
for t in debits:
provider = t.get("aicoreProvider") or "unknown"
costByProvider[provider] = costByProvider.get(provider, 0) + t.get("amount", 0)
model = t.get("aicoreModel") or "unknown"
costByModel[model] = costByModel.get(model, 0) + t.get("amount", 0)
mandate = t.get("mandateName") or t.get("mandateId") or "unknown"
featureCode = t.get("featureCode") or "unknown"
featureKey = f"{mandate} / {featureCode}"
costByFeature[featureKey] = costByFeature.get(featureKey, 0) + t.get("amount", 0)
mandate = t.get("mandateName") or t.get("mandateId") or "unknown"
costByMandate[mandate] = costByMandate.get(mandate, 0) + t.get("amount", 0)
# Build time series (raw data only, no display logic)
timeSeries = []
if period == "day":
numDays = (endDate - startDate).days
for day in range(numDays):
d = startDate + timedelta(days=day)
dayCost = sum(t.get("amount", 0) for t in debits if t["_txDate"] == d)
dayCount = sum(1 for t in debits if t["_txDate"] == d)
if dayCost > 0 or dayCount > 0:
timeSeries.append({
"date": d.isoformat(),
"cost": round(dayCost, 4),
"count": dayCount
})
else:
for m in range(1, 13):
mStart = date(year, m, 1)
mEnd = date(year, m + 1, 1) if m < 12 else date(year + 1, 1, 1)
monthCost = sum(t.get("amount", 0) for t in debits if mStart <= t["_txDate"] < mEnd)
monthCount = sum(1 for t in debits if mStart <= t["_txDate"] < mEnd)
timeSeries.append({
"date": f"{year}-{m:02d}",
"cost": round(monthCost, 4),
"count": monthCount
})
return ViewStatisticsResponse(
totalCost=round(totalCost, 4),
transactionCount=len(debits),
costByProvider=costByProvider,
costByModel=costByModel,
costByFeature=costByFeature,
costByMandate=costByMandate,
timeSeries=timeSeries
)
except Exception as e:
logger.error(f"Error getting view statistics: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@router.get("/view/users/transactions", response_model=PaginatedResponse[UserTransactionResponse])
@limiter.limit("30/minute")
def getUserViewTransactions(
request: Request,
pagination: Optional[str] = Query(None, description="JSON-encoded PaginationParams object"),
ctx: RequestContext = Depends(getRequestContext)
) -> PaginatedResponse[UserTransactionResponse]:
"""
Get user-level transactions with pagination support.
RBAC filtering:
- SysAdmin: sees all user transactions across all mandates
- Mandate-Admin: sees all user transactions for mandates they administrate
- Feature-Instance-Admin: sees transactions for their feature instances
- Regular user: sees only their own transactions
Query Parameters:
- pagination: JSON-encoded PaginationParams object, or None for no pagination
"""
try:
billingInterface = getBillingInterface(ctx.user, ctx.mandateId)
# Parse pagination params
paginationParams = None
if pagination:
import json
paginationDict = json.loads(pagination)
paginationDict = normalize_pagination_dict(paginationDict)
paginationParams = PaginationParams(**paginationDict)
# Evaluate RBAC scope
scope = _getBillingDataScope(ctx.user)
# Determine mandate IDs for data loading
if scope.isGlobalAdmin:
mandateIds = None # Load all
else:
# Load data for all mandates the user belongs to (admin + member)
mandateIds = scope.adminMandateIds + scope.memberMandateIds
if not mandateIds:
return PaginatedResponse(items=[], pagination=None)
allTransactions = billingInterface.getUserTransactionsForMandates(mandateIds, limit=10000)
# Apply RBAC filter
allTransactions = _filterTransactionsByScope(allTransactions, scope)
logger.debug(f"RBAC-filtered {len(allTransactions)} transactions for user {ctx.user.id}")
# Convert to response objects as dicts for filtering/sorting
transactionDicts = []
for t in allTransactions:
transactionDicts.append({
"id": t.get("id"),
"accountId": t.get("accountId"),
"transactionType": t.get("transactionType", "DEBIT"),
"amount": t.get("amount", 0.0),
"description": t.get("description", ""),
"referenceType": t.get("referenceType"),
"workflowId": t.get("workflowId"),
"featureCode": t.get("featureCode"),
"featureInstanceId": t.get("featureInstanceId"),
"aicoreProvider": t.get("aicoreProvider"),
"aicoreModel": t.get("aicoreModel"),
"createdByUserId": t.get("createdByUserId"),
"createdAt": t.get("_createdAt"),
"mandateId": t.get("mandateId"),
"mandateName": t.get("mandateName"),
"userId": t.get("userId"),
"userName": t.get("userName"),
})
# Apply filters and sorting
filteredDicts = _applyFiltersAndSort(transactionDicts, paginationParams)
# Convert to response models
def _toResponse(d):
return UserTransactionResponse(
id=d.get("id"),
accountId=d.get("accountId"),
transactionType=TransactionTypeEnum(d.get("transactionType", "DEBIT")),
amount=d.get("amount", 0.0),
description=d.get("description", ""),
referenceType=ReferenceTypeEnum(d["referenceType"]) if d.get("referenceType") else None,
workflowId=d.get("workflowId"),
featureCode=d.get("featureCode"),
featureInstanceId=d.get("featureInstanceId"),
aicoreProvider=d.get("aicoreProvider"),
aicoreModel=d.get("aicoreModel"),
createdByUserId=d.get("createdByUserId"),
createdAt=d.get("createdAt"),
mandateId=d.get("mandateId"),
mandateName=d.get("mandateName"),
userId=d.get("userId"),
userName=d.get("userName")
)
if paginationParams:
import math
totalItems = len(filteredDicts)
totalPages = math.ceil(totalItems / paginationParams.pageSize) if totalItems > 0 else 0
startIdx = (paginationParams.page - 1) * paginationParams.pageSize
endIdx = startIdx + paginationParams.pageSize
paginatedDicts = filteredDicts[startIdx:endIdx]
return PaginatedResponse(
items=[_toResponse(d) for d in paginatedDicts],
pagination=PaginationMetadata(
currentPage=paginationParams.page,
pageSize=paginationParams.pageSize,
totalItems=totalItems,
totalPages=totalPages,
sort=paginationParams.sort,
filters=paginationParams.filters
)
)
else:
return PaginatedResponse(
items=[_toResponse(d) for d in filteredDicts],
pagination=None
)
except Exception as e:
logger.error(f"Error getting user view transactions: {e}")
raise HTTPException(status_code=500, detail=str(e))