gateway/modules/routes/routeStore.py
2026-03-24 14:16:46 +01:00

422 lines
17 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Feature Store routes.
Own Instance Pattern: Each activation creates a new FeatureInstance
in the user's explicit mandate. Supports Orphan Control.
"""
from fastapi import APIRouter, HTTPException, Depends, Request
from typing import List, Dict, Any, Optional
from fastapi import status
import logging
from pydantic import BaseModel, Field
from modules.auth import limiter, getRequestContext, RequestContext
from modules.datamodels.datamodelFeatures import FeatureInstance
from modules.datamodels.datamodelMembership import FeatureAccess, FeatureAccessRole
from modules.datamodels.datamodelRbac import AccessRuleContext, Role
from modules.datamodels.datamodelUam import Mandate
from modules.datamodels.datamodelMembership import UserMandate, UserMandateRole
from modules.interfaces.interfaceDbApp import getRootInterface
from modules.interfaces.interfaceFeatures import getFeatureInterface
from modules.security.rbacCatalog import getCatalogService
from modules.security.rbac import RbacClass
from modules.security.rootAccess import getRootDbAppConnector
logger = logging.getLogger(__name__)
router = APIRouter(
prefix="/api/store",
tags=["Store"],
responses={404: {"description": "Not found"}}
)
class StoreActivateRequest(BaseModel):
"""Request model for activating a store feature."""
featureCode: str = Field(..., description="Feature code to activate")
mandateId: Optional[str] = Field(None, description="Target mandate ID (explicit). If None and user has no admin mandate, auto-creates personal mandate.")
class StoreDeactivateRequest(BaseModel):
"""Request model for deactivating a store feature."""
featureCode: str = Field(..., description="Feature code to deactivate")
mandateId: str = Field(..., description="Mandate ID")
instanceId: str = Field(..., description="FeatureInstance ID to deactivate")
class StoreFeatureResponse(BaseModel):
"""Response model for a store feature."""
featureCode: str
label: Dict[str, str]
icon: str
description: Dict[str, str] = {}
instances: List[Dict[str, Any]] = []
canActivate: bool
def _getStoreFeatures(catalogService) -> List[Dict[str, Any]]:
"""Get all features available in the store."""
resourceObjects = catalogService.getResourceObjects()
storeFeatures = []
for obj in resourceObjects:
meta = obj.get("meta", {})
if meta.get("category") == "store":
featureCode = meta.get("featureCode")
if featureCode:
featureDef = catalogService.getFeatureDefinition(featureCode)
if featureDef:
storeFeatures.append(featureDef)
return storeFeatures
def _isUserAdminInMandate(db, userId: str, mandateId: str) -> bool:
"""Check if user has admin role in a mandate."""
userMandates = db.getRecordset(UserMandate, recordFilter={"userId": userId, "mandateId": mandateId, "enabled": True})
if not userMandates:
return False
umId = userMandates[0].get("id")
umRoles = db.getRecordset(UserMandateRole, recordFilter={"userMandateId": umId})
for umRole in umRoles:
roleId = umRole.get("roleId")
roles = db.getRecordset(Role, recordFilter={"id": roleId})
for role in roles:
if "admin" in (role.get("roleLabel") or "").lower():
return True
return False
def _getUserAdminMandateIds(db, userId: str) -> List[str]:
"""Get all mandate IDs where user is admin."""
userMandates = db.getRecordset(UserMandate, recordFilter={"userId": userId, "enabled": True})
adminMandateIds = []
for um in userMandates:
mandateId = um.get("mandateId")
mandate = db.getRecordset(Mandate, recordFilter={"id": mandateId})
if mandate and mandate[0].get("isSystem"):
continue
if _isUserAdminInMandate(db, userId, mandateId):
adminMandateIds.append(mandateId)
return adminMandateIds
def _getUserInstancesForFeature(db, userId: str, featureCode: str, mandateIds: List[str]) -> List[Dict[str, Any]]:
"""Get user's active instances for a feature across their mandates."""
instances = []
for mandateId in mandateIds:
mandateInstances = db.getRecordset(
FeatureInstance,
recordFilter={"mandateId": mandateId, "featureCode": featureCode}
)
for inst in mandateInstances:
instanceId = inst.get("id")
accesses = db.getRecordset(
FeatureAccess,
recordFilter={"userId": userId, "featureInstanceId": instanceId}
)
if accesses:
mandate = db.getRecordset(Mandate, recordFilter={"id": mandateId})
mandateName = mandate[0].get("label") or mandate[0].get("name") if mandate else mandateId
instances.append({
"instanceId": instanceId,
"mandateId": mandateId,
"mandateName": mandateName,
"label": inst.get("label", ""),
"isActive": True,
})
return instances
@router.get("/mandates", response_model=List[Dict[str, Any]])
@limiter.limit("60/minute")
def listUserMandates(
request: Request,
context: RequestContext = Depends(getRequestContext)
) -> List[Dict[str, Any]]:
"""List mandates where the user can activate features (admin mandates)."""
try:
rootInterface = getRootInterface()
db = rootInterface.db
userId = str(context.user.id)
adminMandateIds = _getUserAdminMandateIds(db, userId)
result = []
for mid in adminMandateIds:
records = db.getRecordset(Mandate, recordFilter={"id": mid})
if records:
m = records[0]
result.append({
"id": mid,
"name": m.get("name", ""),
"label": m.get("label") or m.get("name", ""),
"mandateType": m.get("mandateType", "company"),
})
return result
except Exception as e:
logger.error(f"Error listing user mandates: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/subscription-info", response_model=Dict[str, Any])
@limiter.limit("60/minute")
def getSubscriptionInfo(
request: Request,
mandateId: str = None,
context: RequestContext = Depends(getRequestContext)
) -> Dict[str, Any]:
"""Get subscription info for a mandate (plan, limits)."""
try:
rootInterface = getRootInterface()
db = rootInterface.db
userId = str(context.user.id)
if not mandateId:
adminMandateIds = _getUserAdminMandateIds(db, userId)
if adminMandateIds:
mandateId = adminMandateIds[0]
if not mandateId:
return {"plan": None, "maxDataVolumeMB": None, "maxFeatureInstances": None}
from modules.datamodels.datamodelSubscription import MandateSubscription, BUILTIN_PLANS
subs = db.getRecordset(MandateSubscription, recordFilter={"mandateId": mandateId})
if not subs:
return {"plan": None, "maxDataVolumeMB": None, "maxFeatureInstances": None}
sub = subs[0]
plan = BUILTIN_PLANS.get(sub.get("planKey"))
currentInstances = db.getRecordset(FeatureInstance, recordFilter={"mandateId": mandateId})
return {
"plan": sub.get("planKey"),
"status": sub.get("status"),
"maxDataVolumeMB": plan.maxDataVolumeMB if plan else None,
"maxFeatureInstances": plan.maxFeatureInstances if plan else None,
"currentFeatureInstances": len(currentInstances),
"trialEndsAt": sub.get("trialEndsAt"),
}
except Exception as e:
logger.error(f"Error getting subscription info: {e}")
return {"plan": None, "maxDataVolumeMB": None, "maxFeatureInstances": None}
@router.get("/features", response_model=List[StoreFeatureResponse])
@limiter.limit("60/minute")
def listStoreFeatures(
request: Request,
context: RequestContext = Depends(getRequestContext)
) -> List[StoreFeatureResponse]:
"""List all store features with activation status per mandate."""
try:
rootInterface = getRootInterface()
db = rootInterface.db
catalogService = getCatalogService()
userId = str(context.user.id)
userMandates = db.getRecordset(UserMandate, recordFilter={"userId": userId, "enabled": True})
userMandateIds = []
for um in userMandates:
mid = um.get("mandateId")
mRecord = db.getRecordset(Mandate, recordFilter={"id": mid})
if mRecord and not mRecord[0].get("isSystem"):
userMandateIds.append(mid)
storeFeatures = _getStoreFeatures(catalogService)
result = []
for featureDef in storeFeatures:
featureCode = featureDef["code"]
instances = _getUserInstancesForFeature(db, userId, featureCode, userMandateIds)
result.append(StoreFeatureResponse(
featureCode=featureCode,
label=featureDef.get("label", {}),
icon=featureDef.get("icon", "mdi-puzzle"),
instances=instances,
canActivate=True,
))
return result
except HTTPException:
raise
except Exception as e:
logger.error(f"Error listing store features: {e}")
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
@router.post("/activate", response_model=Dict[str, Any])
@limiter.limit("10/minute")
def activateStoreFeature(
request: Request,
data: StoreActivateRequest,
context: RequestContext = Depends(getRequestContext)
) -> Dict[str, Any]:
"""
Activate a store feature. Creates a new FeatureInstance in the target mandate.
If mandateId is None and user has no admin mandate, auto-creates a personal mandate.
"""
featureCode = data.featureCode
userId = str(context.user.id)
try:
rootInterface = getRootInterface()
db = rootInterface.db
catalogService = getCatalogService()
featureDef = catalogService.getFeatureDefinition(featureCode)
if not featureDef:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Feature '{featureCode}' not found")
mandateId = data.mandateId
# Auto-create personal mandate if user has no admin mandates
if not mandateId:
adminMandateIds = _getUserAdminMandateIds(db, userId)
if not adminMandateIds:
provisionResult = rootInterface._provisionMandateForUser(
userId=userId,
mandateType="personal",
mandateName=context.user.fullName or context.user.username,
planKey="TRIAL_7D",
)
mandateId = provisionResult["mandateId"]
logger.info(f"Auto-created personal mandate {mandateId} for user {userId} via store")
elif len(adminMandateIds) == 1:
mandateId = adminMandateIds[0]
else:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="mandateId is required when user has multiple admin mandates"
)
# Verify user is admin in target mandate
if not _isUserAdminInMandate(db, userId, mandateId):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not admin in target mandate")
# Check subscription capacity
from modules.datamodels.datamodelSubscription import MandateSubscription, BUILTIN_PLANS
subs = db.getRecordset(MandateSubscription, recordFilter={"mandateId": mandateId})
if subs:
sub = subs[0]
plan = BUILTIN_PLANS.get(sub.get("planKey"))
if plan and plan.maxFeatureInstances is not None:
currentInstances = db.getRecordset(FeatureInstance, recordFilter={"mandateId": mandateId})
if len(currentInstances) >= plan.maxFeatureInstances:
raise HTTPException(
status_code=status.HTTP_402_PAYMENT_REQUIRED,
detail=f"Feature instance limit reached ({plan.maxFeatureInstances}). Upgrade your plan."
)
# Create new FeatureInstance
featureInterface = getFeatureInterface(db)
featureLabel = featureDef.get("label", {}).get("en", featureCode)
instance = featureInterface.createFeatureInstance(
featureCode=featureCode,
mandateId=mandateId,
label=featureLabel,
enabled=True,
copyTemplateRoles=True,
)
if not instance:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to create feature instance")
instanceId = instance.get("id") if isinstance(instance, dict) else instance.id
# Grant FeatureAccess with admin role
instanceRoles = db.getRecordset(Role, recordFilter={"featureInstanceId": instanceId})
adminRoleId = None
for ir in instanceRoles:
if "admin" in (ir.get("roleLabel") or "").lower():
adminRoleId = ir.get("id")
break
fa = FeatureAccess(userId=userId, featureInstanceId=instanceId, enabled=True)
createdFa = db.recordCreate(FeatureAccess, fa.model_dump())
if adminRoleId and createdFa:
far = FeatureAccessRole(featureAccessId=createdFa["id"], roleId=adminRoleId)
db.recordCreate(FeatureAccessRole, far.model_dump())
# Sync subscription quantity
try:
rootInterface._syncSubscriptionQuantity(mandateId)
except Exception as e:
logger.warning(f"Failed to sync subscription quantity: {e}")
logger.info(f"User {userId} activated '{featureCode}' in mandate {mandateId} (instance={instanceId})")
return {
"featureCode": featureCode,
"mandateId": mandateId,
"instanceId": instanceId,
"activated": True,
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error activating store feature '{featureCode}': {e}")
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
@router.post("/deactivate", response_model=Dict[str, Any])
@limiter.limit("10/minute")
def deactivateStoreFeature(
request: Request,
data: StoreDeactivateRequest,
context: RequestContext = Depends(getRequestContext)
) -> Dict[str, Any]:
"""
Deactivate a store feature. Removes user's FeatureAccess.
Orphan Control: if last user deactivates, FeatureInstance is deleted.
"""
userId = str(context.user.id)
instanceId = data.instanceId
mandateId = data.mandateId
try:
rootInterface = getRootInterface()
db = rootInterface.db
# Verify instance exists in mandate
instances = db.getRecordset(FeatureInstance, recordFilter={"id": instanceId, "mandateId": mandateId})
if not instances:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Feature instance not found in mandate")
# Find user's FeatureAccess
accesses = db.getRecordset(FeatureAccess, recordFilter={"userId": userId, "featureInstanceId": instanceId})
if not accesses:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="No active access found")
featureAccessId = accesses[0].get("id")
db.recordDelete(FeatureAccess, featureAccessId)
# Orphan Control: check if any FeatureAccess remains
remainingAccesses = db.getRecordset(FeatureAccess, recordFilter={"featureInstanceId": instanceId})
instanceDeleted = False
if not remainingAccesses:
db.recordDelete(FeatureInstance, instanceId)
instanceDeleted = True
logger.info(f"Orphan Control: deleted instance {instanceId} (no remaining accesses)")
# Sync subscription quantity
try:
rootInterface._syncSubscriptionQuantity(mandateId)
except Exception as e:
logger.warning(f"Failed to sync subscription quantity: {e}")
logger.info(f"User {userId} deactivated instance {instanceId} in mandate {mandateId} (deleted={instanceDeleted})")
return {
"featureCode": data.featureCode,
"mandateId": mandateId,
"instanceId": instanceId,
"deactivated": True,
"instanceDeleted": instanceDeleted,
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error deactivating store feature: {e}")
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))