gateway/modules/routes/routeBilling.py

1082 lines
40 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Billing routes for the backend API.
Implements the endpoints for billing management and usage tracking.
Features:
- User endpoints: View balance, transactions, statistics
- Admin endpoints: Manage settings, add credits, view all accounts
"""
from fastapi import APIRouter, HTTPException, Depends, Body, Path, Request, Response, Query
from typing import List, Dict, Any, Optional
from fastapi import status
import logging
from datetime import date, datetime
from pydantic import BaseModel, Field
# Import auth module
from modules.auth import limiter, requireSysAdmin, getRequestContext, RequestContext
# Import billing components
from modules.interfaces.interfaceDbBilling import getInterface as getBillingInterface
from modules.services.serviceBilling.mainServiceBilling import getService as getBillingService
from modules.datamodels.datamodelPagination import PaginationParams, PaginatedResponse, PaginationMetadata, normalize_pagination_dict
from modules.routes.routeDataUsers import _applyFiltersAndSort
from modules.datamodels.datamodelBilling import (
BillingAccount,
BillingTransaction,
BillingSettings,
BillingAddress,
BillingModelEnum,
TransactionTypeEnum,
ReferenceTypeEnum,
PeriodTypeEnum,
BillingBalanceResponse,
BillingStatisticsResponse,
BillingStatisticsChartData,
BillingCheckResult,
)
# Configure logger
logger = logging.getLogger(__name__)
# =============================================================================
# Request/Response Models
# =============================================================================
class CreditAddRequest(BaseModel):
"""Request model for adding credit to an account."""
userId: Optional[str] = Field(None, description="Target user ID (for PREPAY_USER model)")
amount: float = Field(..., gt=0, description="Amount to credit in CHF")
description: str = Field(default="Manual credit", description="Transaction description")
class BillingSettingsUpdate(BaseModel):
"""Request model for updating billing settings."""
billingModel: Optional[BillingModelEnum] = None
defaultUserCredit: Optional[float] = Field(None, ge=0)
warningThresholdPercent: Optional[float] = Field(None, ge=0, le=100)
blockOnZeroBalance: Optional[bool] = None
notifyOnWarning: Optional[bool] = None
notifyEmails: Optional[List[str]] = None
billingAddress: Optional[BillingAddress] = None
class TransactionResponse(BaseModel):
"""Response model for a billing transaction."""
id: str
accountId: str
transactionType: TransactionTypeEnum
amount: float
description: str
referenceType: Optional[ReferenceTypeEnum]
workflowId: Optional[str]
featureCode: Optional[str]
aicoreProvider: Optional[str]
createdAt: Optional[datetime]
mandateId: Optional[str] = None
mandateName: Optional[str] = None
class AccountSummary(BaseModel):
"""Summary of a billing account."""
id: str
mandateId: str
userId: Optional[str]
accountType: str
balance: float
creditLimit: Optional[float]
warningThreshold: float
enabled: bool
class UsageReportResponse(BaseModel):
"""Usage report for a period."""
period: str
totalCost: float
transactionCount: int
costByProvider: Dict[str, float]
costByFeature: Dict[str, float]
# =============================================================================
# Response Models for Mandate/User Views
# =============================================================================
class MandateBalanceResponse(BaseModel):
"""Mandate-level balance summary."""
mandateId: str
mandateName: str
billingModel: str
totalBalance: float
userCount: int
defaultUserCredit: float
warningThresholdPercent: float
blockOnZeroBalance: bool
class UserBalanceResponse(BaseModel):
"""User-level balance summary."""
accountId: str
mandateId: str
mandateName: str
userId: str
userName: str
balance: float
warningThreshold: float
isWarning: bool
enabled: bool
class UserTransactionResponse(BaseModel):
"""User-level transaction with user context."""
id: str
accountId: str
transactionType: TransactionTypeEnum
amount: float
description: str
referenceType: Optional[ReferenceTypeEnum]
workflowId: Optional[str]
featureCode: Optional[str]
aicoreProvider: Optional[str]
createdAt: Optional[datetime]
mandateId: Optional[str] = None
mandateName: Optional[str] = None
userId: Optional[str] = None
userName: Optional[str] = None
# =============================================================================
# Router Setup
# =============================================================================
router = APIRouter(
prefix="/api/billing",
tags=["Billing"],
responses={404: {"description": "Not found"}}
)
# =============================================================================
# User Endpoints
# =============================================================================
@router.get("/balance", response_model=List[BillingBalanceResponse])
@limiter.limit("60/minute")
async def getBalance(
request: Request,
ctx: RequestContext = Depends(getRequestContext)
):
"""
Get billing balances for all mandates the current user belongs to.
Returns balance information for each mandate.
"""
try:
billingService = getBillingService(
ctx.user,
ctx.mandateId,
featureCode="billing"
)
balances = billingService.getBalancesForUser()
return balances
except Exception as e:
logger.error(f"Error getting billing balance: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/balance/{targetMandateId}", response_model=BillingBalanceResponse)
@limiter.limit("60/minute")
async def getBalanceForMandate(
request: Request,
targetMandateId: str = Path(..., description="Mandate ID"),
ctx: RequestContext = Depends(getRequestContext)
):
"""
Get billing balance for a specific mandate.
"""
try:
billingService = getBillingService(
ctx.user,
targetMandateId,
featureCode="billing"
)
# Check balance
checkResult = billingService.checkBalance(0.0)
# Get mandate name from app interface
from modules.interfaces.interfaceDbApp import getInterface as getAppInterface
appInterface = getAppInterface(ctx.user, mandateId=targetMandateId)
mandate = appInterface.getMandate(targetMandateId)
mandateName = mandate.get("name", "") if mandate else ""
return BillingBalanceResponse(
mandateId=targetMandateId,
mandateName=mandateName,
billingModel=checkResult.billingModel or BillingModelEnum.UNLIMITED,
balance=checkResult.currentBalance or 0.0,
warningThreshold=0.0, # TODO: Get from account
isWarning=False,
creditLimit=None
)
except Exception as e:
logger.error(f"Error getting billing balance for mandate {targetMandateId}: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/transactions", response_model=List[TransactionResponse])
@limiter.limit("30/minute")
async def getTransactions(
request: Request,
limit: int = Query(default=50, ge=1, le=500),
offset: int = Query(default=0, ge=0),
ctx: RequestContext = Depends(getRequestContext)
):
"""
Get transaction history across all mandates the user belongs to.
"""
try:
billingService = getBillingService(
ctx.user,
ctx.mandateId,
featureCode="billing"
)
# Fetch enough transactions for pagination
transactions = billingService.getTransactionHistory(limit=offset + limit)
# Convert to response model
result = []
for t in transactions[offset:offset + limit]:
result.append(TransactionResponse(
id=t.get("id"),
accountId=t.get("accountId"),
transactionType=TransactionTypeEnum(t.get("transactionType", "DEBIT")),
amount=t.get("amount", 0.0),
description=t.get("description", ""),
referenceType=ReferenceTypeEnum(t["referenceType"]) if t.get("referenceType") else None,
workflowId=t.get("workflowId"),
featureCode=t.get("featureCode"),
aicoreProvider=t.get("aicoreProvider"),
createdAt=t.get("_createdAt"),
mandateId=t.get("mandateId"),
mandateName=t.get("mandateName")
))
return result
except Exception as e:
logger.error(f"Error getting billing transactions: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/statistics/{period}", response_model=UsageReportResponse)
@limiter.limit("30/minute")
async def getStatistics(
request: Request,
period: str = Path(..., description="Period: 'day', 'month', or 'year'"),
year: int = Query(..., description="Year"),
month: Optional[int] = Query(None, description="Month (1-12, required for 'day' period)"),
ctx: RequestContext = Depends(getRequestContext)
):
"""
Get usage statistics for a period.
"""
try:
# Validate period
if period not in ["day", "month", "year"]:
raise HTTPException(status_code=400, detail="Invalid period. Use 'day', 'month', or 'year'")
if period == "day" and not month:
raise HTTPException(status_code=400, detail="Month is required for 'day' period")
billingInterface = getBillingInterface(ctx.user, ctx.mandateId)
settings = billingInterface.getSettings(ctx.mandateId)
if not settings:
return UsageReportResponse(
period=period,
totalCost=0.0,
transactionCount=0,
costByProvider={},
costByFeature={}
)
billingModel = BillingModelEnum(settings.get("billingModel", BillingModelEnum.UNLIMITED.value))
# Get the relevant account
if billingModel == BillingModelEnum.PREPAY_USER:
account = billingInterface.getUserAccount(ctx.mandateId, ctx.user.id)
else:
account = billingInterface.getMandateAccount(ctx.mandateId)
if not account:
return UsageReportResponse(
period=period,
totalCost=0.0,
transactionCount=0,
costByProvider={},
costByFeature={}
)
# Calculate date range
if period == "day":
startDate = date(year, month, 1)
if month == 12:
endDate = date(year + 1, 1, 1)
else:
endDate = date(year, month + 1, 1)
elif period == "month":
startDate = date(year, 1, 1)
endDate = date(year + 1, 1, 1)
else: # year
startDate = date(year, 1, 1)
endDate = date(year + 1, 1, 1)
# Get statistics from transactions
stats = billingInterface.calculateStatisticsFromTransactions(
account["id"],
startDate,
endDate
)
return UsageReportResponse(
period=period,
totalCost=stats.get("totalCostCHF", 0.0),
transactionCount=stats.get("transactionCount", 0),
costByProvider=stats.get("costByProvider", {}),
costByFeature=stats.get("costByFeature", {})
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting billing statistics: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/providers", response_model=List[str])
@limiter.limit("60/minute")
async def getAllowedProviders(
request: Request,
ctx: RequestContext = Depends(getRequestContext)
):
"""
Get list of AICore providers the current user is allowed to use.
"""
try:
billingService = getBillingService(
ctx.user,
ctx.mandateId,
featureCode="billing"
)
return billingService.getallowedProviders()
except Exception as e:
logger.error(f"Error getting allowed providers: {e}")
raise HTTPException(status_code=500, detail=str(e))
# =============================================================================
# Admin Endpoints
# =============================================================================
@router.get("/admin/settings/{targetMandateId}", response_model=Dict[str, Any])
@limiter.limit("30/minute")
async def getSettingsAdmin(
request: Request,
targetMandateId: str = Path(..., description="Mandate ID"),
ctx: RequestContext = Depends(getRequestContext),
_admin = Depends(requireSysAdmin)
):
"""
Get billing settings for a mandate (SysAdmin only).
"""
try:
billingInterface = getBillingInterface(ctx.user, targetMandateId)
settings = billingInterface.getSettings(targetMandateId)
if not settings:
raise HTTPException(status_code=404, detail="Billing settings not found")
return settings
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting billing settings: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/admin/settings/{targetMandateId}", response_model=Dict[str, Any])
@limiter.limit("10/minute")
async def createOrUpdateSettings(
request: Request,
targetMandateId: str = Path(..., description="Mandate ID"),
settingsUpdate: BillingSettingsUpdate = Body(...),
ctx: RequestContext = Depends(getRequestContext),
_admin = Depends(requireSysAdmin)
):
"""
Create or update billing settings for a mandate (SysAdmin only).
"""
try:
billingInterface = getBillingInterface(ctx.user, targetMandateId)
existingSettings = billingInterface.getSettings(targetMandateId)
if existingSettings:
# Update existing settings
updates = settingsUpdate.model_dump(exclude_none=True)
if updates:
result = billingInterface.updateSettings(existingSettings["id"], updates)
return result or existingSettings
return existingSettings
else:
# Create new settings
from modules.datamodels.datamodelBilling import BillingSettings
newSettings = BillingSettings(
mandateId=targetMandateId,
billingModel=settingsUpdate.billingModel or BillingModelEnum.UNLIMITED,
defaultUserCredit=settingsUpdate.defaultUserCredit or 10.0,
warningThresholdPercent=settingsUpdate.warningThresholdPercent or 10.0,
blockOnZeroBalance=settingsUpdate.blockOnZeroBalance if settingsUpdate.blockOnZeroBalance is not None else True,
notifyOnWarning=settingsUpdate.notifyOnWarning if settingsUpdate.notifyOnWarning is not None else True,
notifyEmails=settingsUpdate.notifyEmails or [],
billingAddress=settingsUpdate.billingAddress
)
return billingInterface.createSettings(newSettings)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error updating billing settings: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/admin/credit/{targetMandateId}", response_model=Dict[str, Any])
@limiter.limit("10/minute")
async def addCredit(
request: Request,
targetMandateId: str = Path(..., description="Mandate ID"),
creditRequest: CreditAddRequest = Body(...),
ctx: RequestContext = Depends(getRequestContext),
_admin = Depends(requireSysAdmin)
):
"""
Add credit to a billing account (SysAdmin only).
For PREPAY_USER model, specify userId. For PREPAY_MANDATE, leave userId empty.
"""
try:
# Get settings to determine billing model
billingInterface = getBillingInterface(ctx.user, targetMandateId)
settings = billingInterface.getSettings(targetMandateId)
if not settings:
raise HTTPException(status_code=404, detail="Billing settings not found for this mandate")
billingModel = BillingModelEnum(settings.get("billingModel", BillingModelEnum.UNLIMITED.value))
# Validate request based on billing model
if billingModel == BillingModelEnum.PREPAY_USER:
if not creditRequest.userId:
raise HTTPException(status_code=400, detail="userId is required for PREPAY_USER model")
# Create user-level account if needed and add credit
account = billingInterface.getOrCreateUserAccount(
targetMandateId,
creditRequest.userId,
initialBalance=0.0
)
elif billingModel in [BillingModelEnum.PREPAY_MANDATE, BillingModelEnum.CREDIT_POSTPAY]:
# Create mandate-level account if needed and add credit
account = billingInterface.getOrCreateMandateAccount(targetMandateId, initialBalance=0.0)
else:
raise HTTPException(status_code=400, detail=f"Cannot add credit to {billingModel.value} billing model")
# Create credit transaction
from modules.datamodels.datamodelBilling import BillingTransaction
transaction = BillingTransaction(
accountId=account["id"],
transactionType=TransactionTypeEnum.CREDIT,
amount=creditRequest.amount,
description=creditRequest.description,
referenceType=ReferenceTypeEnum.ADMIN
)
result = billingInterface.createTransaction(transaction)
logger.info(f"Added {creditRequest.amount} CHF credit to account {account['id']} in mandate {targetMandateId}")
return result
except HTTPException:
raise
except Exception as e:
logger.error(f"Error adding credit: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/admin/accounts/{targetMandateId}", response_model=List[AccountSummary])
@limiter.limit("30/minute")
async def getAccounts(
request: Request,
targetMandateId: str = Path(..., description="Mandate ID"),
ctx: RequestContext = Depends(getRequestContext),
_admin = Depends(requireSysAdmin)
):
"""
Get all billing accounts for a mandate (SysAdmin only).
"""
try:
billingInterface = getBillingInterface(ctx.user, targetMandateId)
# Get all accounts for this mandate via interface
accounts = billingInterface.getAccountsByMandate(targetMandateId)
result = []
for acc in accounts:
result.append(AccountSummary(
id=acc.get("id"),
mandateId=acc.get("mandateId"),
userId=acc.get("userId"),
accountType=acc.get("accountType"),
balance=acc.get("balance", 0.0),
creditLimit=acc.get("creditLimit"),
warningThreshold=acc.get("warningThreshold", 0.0),
enabled=acc.get("enabled", True)
))
return result
except Exception as e:
logger.error(f"Error getting billing accounts: {e}")
raise HTTPException(status_code=500, detail=str(e))
class MandateUserSummary(BaseModel):
"""Summary of a user for billing admin purposes."""
id: str
email: Optional[str] = None
firstName: Optional[str] = None
lastName: Optional[str] = None
displayName: Optional[str] = None
@router.get("/admin/users/{targetMandateId}", response_model=List[MandateUserSummary])
@limiter.limit("30/minute")
async def getUsersForMandate(
request: Request,
targetMandateId: str = Path(..., description="Mandate ID"),
ctx: RequestContext = Depends(getRequestContext),
_admin = Depends(requireSysAdmin)
):
"""
Get all users belonging to a mandate (SysAdmin only).
Used by billing admin to select users for credit assignment.
"""
try:
from modules.interfaces.interfaceDbApp import getInterface as getAppInterface
appInterface = getAppInterface(ctx.user, mandateId=targetMandateId)
userMandates = appInterface.getUserMandatesByMandate(targetMandateId)
result = []
for um in userMandates:
userId = um.get("userId") if isinstance(um, dict) else getattr(um, "userId", None)
if not userId:
continue
user = appInterface.getUser(userId)
if not user:
continue
# Handle both Pydantic models and dicts
if isinstance(user, dict):
firstName = user.get("firstName", "")
lastName = user.get("lastName", "")
email = user.get("email", "")
else:
firstName = getattr(user, "firstName", "") or ""
lastName = getattr(user, "lastName", "") or ""
email = getattr(user, "email", "") or ""
displayName = f"{firstName} {lastName}".strip() or email or userId
result.append(MandateUserSummary(
id=userId,
email=email,
firstName=firstName,
lastName=lastName,
displayName=displayName
))
return result
except Exception as e:
logger.error(f"Error getting users for mandate {targetMandateId}: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/admin/transactions/{targetMandateId}", response_model=List[TransactionResponse])
@limiter.limit("30/minute")
async def getTransactionsAdmin(
request: Request,
targetMandateId: str = Path(..., description="Mandate ID"),
limit: int = Query(default=100, ge=1, le=1000),
ctx: RequestContext = Depends(getRequestContext),
_admin = Depends(requireSysAdmin)
):
"""
Get all transactions for a mandate (SysAdmin only).
"""
try:
billingInterface = getBillingInterface(ctx.user, targetMandateId)
transactions = billingInterface.getTransactionsByMandate(targetMandateId, limit=limit)
result = []
for t in transactions:
result.append(TransactionResponse(
id=t.get("id"),
accountId=t.get("accountId"),
transactionType=TransactionTypeEnum(t.get("transactionType", "DEBIT")),
amount=t.get("amount", 0.0),
description=t.get("description", ""),
referenceType=ReferenceTypeEnum(t["referenceType"]) if t.get("referenceType") else None,
workflowId=t.get("workflowId"),
featureCode=t.get("featureCode"),
aicoreProvider=t.get("aicoreProvider"),
createdAt=t.get("_createdAt")
))
return result
except Exception as e:
logger.error(f"Error getting billing transactions for mandate {targetMandateId}: {e}")
raise HTTPException(status_code=500, detail=str(e))
# =============================================================================
# Mandate View Endpoints (for Admins)
# =============================================================================
@router.get("/view/mandates/balances", response_model=List[MandateBalanceResponse])
@limiter.limit("30/minute")
async def getMandateViewBalances(
request: Request,
ctx: RequestContext = Depends(getRequestContext),
_admin = Depends(requireSysAdmin)
):
"""
Get mandate-level balances (SysAdmin only).
Shows aggregated balances per mandate.
"""
try:
billingInterface = getBillingInterface(ctx.user, ctx.mandateId)
balances = billingInterface.getMandateBalances()
return [MandateBalanceResponse(**b) for b in balances]
except Exception as e:
logger.error(f"Error getting mandate view balances: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/view/mandates/transactions", response_model=List[TransactionResponse])
@limiter.limit("30/minute")
async def getMandateViewTransactions(
request: Request,
limit: int = Query(default=100, ge=1, le=1000),
ctx: RequestContext = Depends(getRequestContext),
_admin = Depends(requireSysAdmin)
):
"""
Get all transactions across mandates (SysAdmin only).
"""
try:
billingInterface = getBillingInterface(ctx.user, ctx.mandateId)
transactions = billingInterface.getMandateTransactions(limit=limit)
result = []
for t in transactions:
result.append(TransactionResponse(
id=t.get("id"),
accountId=t.get("accountId"),
transactionType=TransactionTypeEnum(t.get("transactionType", "DEBIT")),
amount=t.get("amount", 0.0),
description=t.get("description", ""),
referenceType=ReferenceTypeEnum(t["referenceType"]) if t.get("referenceType") else None,
workflowId=t.get("workflowId"),
featureCode=t.get("featureCode"),
aicoreProvider=t.get("aicoreProvider"),
createdAt=t.get("_createdAt"),
mandateId=t.get("mandateId"),
mandateName=t.get("mandateName")
))
return result
except Exception as e:
logger.error(f"Error getting mandate view transactions: {e}")
raise HTTPException(status_code=500, detail=str(e))
# =============================================================================
# User View Endpoints (RBAC-based)
# =============================================================================
@router.get("/view/users/balances", response_model=List[UserBalanceResponse])
@limiter.limit("30/minute")
async def getUserViewBalances(
request: Request,
ctx: RequestContext = Depends(getRequestContext)
):
"""
Get user-level balances.
- SysAdmin: sees all user balances across all mandates
- MandateAdmin: sees user balances for mandates they manage
- Regular user: sees only their own balances
"""
try:
billingInterface = getBillingInterface(ctx.user, ctx.mandateId)
# Determine which mandates the user has access to
if ctx.user.isSysAdmin:
# SysAdmin sees all
mandateIds = None
else:
# Get mandates where user is admin or has billing access
from modules.interfaces.interfaceDbApp import getInterface as getAppInterface
appInterface = getAppInterface(ctx.user)
userMandates = appInterface.getUserMandates(ctx.user.id)
# Filter to only mandates where user has admin role
# For simplicity, we'll check if user is admin in any mandate
mandateIds = []
for um in userMandates:
mandateId = getattr(um, 'mandateId', None) or (um.get("mandateId") if isinstance(um, dict) else None)
if mandateId:
mandateIds.append(mandateId)
if not mandateIds:
return []
allBalances = billingInterface.getUserBalancesForMandates(mandateIds)
# Non-admin users only see their own balances
if not ctx.user.isSysAdmin:
allBalances = [b for b in allBalances if b.get("userId") == ctx.user.id]
return [UserBalanceResponse(**b) for b in allBalances]
except Exception as e:
logger.error(f"Error getting user view balances: {e}")
raise HTTPException(status_code=500, detail=str(e))
class ViewStatisticsResponse(BaseModel):
"""Aggregated statistics across all user's mandates."""
totalCost: float = 0.0
transactionCount: int = 0
costByProvider: Dict[str, float] = {}
costByFeature: Dict[str, float] = {}
costByMandate: Dict[str, float] = {}
timeSeries: List[Dict[str, Any]] = []
@router.get("/view/statistics")
@limiter.limit("30/minute")
async def getUserViewStatistics(
request: Request,
period: str = Query(default="month", description="Period: 'day' or 'month'"),
year: int = Query(default=None, description="Year"),
month: Optional[int] = Query(None, description="Month (1-12, required for period='day')"),
ctx: RequestContext = Depends(getRequestContext)
) -> ViewStatisticsResponse:
"""
Get aggregated usage statistics across all user's mandates.
- period='month': returns monthly time series for the given year
- period='day': returns daily time series for the given month/year
"""
try:
from datetime import timedelta
if year is None:
year = datetime.now().year
if period == "day" and not month:
month = datetime.now().month
billingInterface = getBillingInterface(ctx.user, ctx.mandateId)
# Get all mandates the user has access to
if ctx.user.isSysAdmin:
mandateIds = None
else:
from modules.interfaces.interfaceDbApp import getInterface as getAppInterface
appInterface = getAppInterface(ctx.user)
userMandates = appInterface.getUserMandates(ctx.user.id)
mandateIds = []
for um in userMandates:
mandateId = getattr(um, 'mandateId', None) or (um.get("mandateId") if isinstance(um, dict) else None)
if mandateId:
mandateIds.append(mandateId)
if not mandateIds:
logger.warning("No mandate IDs found for user")
return ViewStatisticsResponse()
# Get all transactions
allTransactions = billingInterface.getUserTransactionsForMandates(mandateIds, limit=10000)
logger.info(f"View statistics: {len(allTransactions)} total transactions fetched for period={period}, year={year}, month={month}")
# Calculate date range
if period == "day":
startDate = date(year, month, 1)
if month == 12:
endDate = date(year + 1, 1, 1)
else:
endDate = date(year, month + 1, 1)
else:
startDate = date(year, 1, 1)
endDate = date(year + 1, 1, 1)
# Filter by date range and only DEBIT transactions
debits = []
skippedNoDate = 0
skippedDateRange = 0
skippedNotDebit = 0
for t in allTransactions:
createdAt = t.get("_createdAt")
if not createdAt:
skippedNoDate += 1
continue
# Parse date from various formats (DB stores as DOUBLE PRECISION / Unix timestamp)
txDate = None
if isinstance(createdAt, (int, float)):
txDate = datetime.fromtimestamp(createdAt).date()
elif isinstance(createdAt, datetime):
txDate = createdAt.date()
elif isinstance(createdAt, date) and not isinstance(createdAt, datetime):
txDate = createdAt
elif isinstance(createdAt, str):
try:
# Try as float string first (Unix timestamp)
txDate = datetime.fromtimestamp(float(createdAt)).date()
except (ValueError, TypeError):
try:
txDate = datetime.fromisoformat(createdAt.replace("Z", "+00:00")).date()
except (ValueError, TypeError):
skippedNoDate += 1
continue
else:
skippedNoDate += 1
continue
if txDate < startDate or txDate >= endDate:
skippedDateRange += 1
continue
# Compare transactionType - handle both string and enum
txType = t.get("transactionType")
txTypeStr = str(txType) if txType is not None else ""
if txTypeStr != "DEBIT" and txTypeStr != "TransactionTypeEnum.DEBIT":
# Also check .value for enum objects
txTypeValue = getattr(txType, 'value', txTypeStr)
if txTypeValue != "DEBIT":
skippedNotDebit += 1
continue
t["_txDate"] = txDate
debits.append(t)
logger.info(f"View statistics: {len(debits)} DEBIT transactions after filter. "
f"Skipped: noDate={skippedNoDate}, dateRange={skippedDateRange}, notDebit={skippedNotDebit}")
# Aggregate totals
totalCost = sum(t.get("amount", 0) for t in debits)
costByProvider: Dict[str, float] = {}
costByFeature: Dict[str, float] = {}
costByMandate: Dict[str, float] = {}
for t in debits:
provider = t.get("aicoreProvider") or "unknown"
costByProvider[provider] = costByProvider.get(provider, 0) + t.get("amount", 0)
mandate = t.get("mandateName") or t.get("mandateId") or "unknown"
featureCode = t.get("featureCode") or "unknown"
featureKey = f"{mandate} / {featureCode}"
costByFeature[featureKey] = costByFeature.get(featureKey, 0) + t.get("amount", 0)
mandate = t.get("mandateName") or t.get("mandateId") or "unknown"
costByMandate[mandate] = costByMandate.get(mandate, 0) + t.get("amount", 0)
# Build time series (raw data only, no display logic)
timeSeries = []
if period == "day":
numDays = (endDate - startDate).days
for day in range(numDays):
d = startDate + timedelta(days=day)
dayCost = sum(t.get("amount", 0) for t in debits if t["_txDate"] == d)
dayCount = sum(1 for t in debits if t["_txDate"] == d)
if dayCost > 0 or dayCount > 0:
timeSeries.append({
"date": d.isoformat(),
"cost": round(dayCost, 4),
"count": dayCount
})
else:
for m in range(1, 13):
mStart = date(year, m, 1)
mEnd = date(year, m + 1, 1) if m < 12 else date(year + 1, 1, 1)
monthCost = sum(t.get("amount", 0) for t in debits if mStart <= t["_txDate"] < mEnd)
monthCount = sum(1 for t in debits if mStart <= t["_txDate"] < mEnd)
timeSeries.append({
"date": f"{year}-{m:02d}",
"cost": round(monthCost, 4),
"count": monthCount
})
return ViewStatisticsResponse(
totalCost=round(totalCost, 4),
transactionCount=len(debits),
costByProvider=costByProvider,
costByFeature=costByFeature,
costByMandate=costByMandate,
timeSeries=timeSeries
)
except Exception as e:
logger.error(f"Error getting view statistics: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@router.get("/view/users/transactions", response_model=PaginatedResponse[UserTransactionResponse])
@limiter.limit("30/minute")
async def getUserViewTransactions(
request: Request,
pagination: Optional[str] = Query(None, description="JSON-encoded PaginationParams object"),
ctx: RequestContext = Depends(getRequestContext)
) -> PaginatedResponse[UserTransactionResponse]:
"""
Get user-level transactions with pagination support.
- SysAdmin: sees all user transactions across all mandates
- MandateAdmin: sees user transactions for mandates they manage
- Regular user: sees only their own transactions
Query Parameters:
- pagination: JSON-encoded PaginationParams object, or None for no pagination
"""
try:
billingInterface = getBillingInterface(ctx.user, ctx.mandateId)
# Parse pagination params
paginationParams = None
if pagination:
import json
paginationDict = json.loads(pagination)
paginationDict = normalize_pagination_dict(paginationDict)
paginationParams = PaginationParams(**paginationDict)
# Determine which mandates the user has access to
if ctx.user.isSysAdmin:
# SysAdmin sees all
mandateIds = None
else:
# Get mandates where user has access
from modules.interfaces.interfaceDbApp import getInterface as getAppInterface
appInterface = getAppInterface(ctx.user)
userMandates = appInterface.getUserMandates(ctx.user.id)
mandateIds = []
for um in userMandates:
mandateId = getattr(um, 'mandateId', None) or (um.get("mandateId") if isinstance(um, dict) else None)
if mandateId:
mandateIds.append(mandateId)
if not mandateIds:
return PaginatedResponse(items=[], pagination=None)
allTransactions = billingInterface.getUserTransactionsForMandates(mandateIds, limit=10000)
logger.debug(f"Found {len(allTransactions)} transactions for mandates {mandateIds}")
# Convert to response objects as dicts for filtering/sorting
transactionDicts = []
for t in allTransactions:
transactionDicts.append({
"id": t.get("id"),
"accountId": t.get("accountId"),
"transactionType": t.get("transactionType", "DEBIT"),
"amount": t.get("amount", 0.0),
"description": t.get("description", ""),
"referenceType": t.get("referenceType"),
"workflowId": t.get("workflowId"),
"featureCode": t.get("featureCode"),
"aicoreProvider": t.get("aicoreProvider"),
"createdAt": t.get("_createdAt"),
"mandateId": t.get("mandateId"),
"mandateName": t.get("mandateName"),
"userId": t.get("userId"),
"userName": t.get("userName"),
})
# Apply filters and sorting
filteredDicts = _applyFiltersAndSort(transactionDicts, paginationParams)
# Convert to response models
def _toResponse(d):
return UserTransactionResponse(
id=d.get("id"),
accountId=d.get("accountId"),
transactionType=TransactionTypeEnum(d.get("transactionType", "DEBIT")),
amount=d.get("amount", 0.0),
description=d.get("description", ""),
referenceType=ReferenceTypeEnum(d["referenceType"]) if d.get("referenceType") else None,
workflowId=d.get("workflowId"),
featureCode=d.get("featureCode"),
aicoreProvider=d.get("aicoreProvider"),
createdAt=d.get("createdAt"),
mandateId=d.get("mandateId"),
mandateName=d.get("mandateName"),
userId=d.get("userId"),
userName=d.get("userName")
)
if paginationParams:
import math
totalItems = len(filteredDicts)
totalPages = math.ceil(totalItems / paginationParams.pageSize) if totalItems > 0 else 0
startIdx = (paginationParams.page - 1) * paginationParams.pageSize
endIdx = startIdx + paginationParams.pageSize
paginatedDicts = filteredDicts[startIdx:endIdx]
return PaginatedResponse(
items=[_toResponse(d) for d in paginatedDicts],
pagination=PaginationMetadata(
currentPage=paginationParams.page,
pageSize=paginationParams.pageSize,
totalItems=totalItems,
totalPages=totalPages,
sort=paginationParams.sort,
filters=paginationParams.filters
)
)
else:
return PaginatedResponse(
items=[_toResponse(d) for d in filteredDicts],
pagination=None
)
except Exception as e:
logger.error(f"Error getting user view transactions: {e}")
raise HTTPException(status_code=500, detail=str(e))