gateway/modules/routes/routeStore.py
2026-02-23 17:13:50 +01:00

344 lines
12 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Feature Store routes.
Allows users to self-activate features in the root mandate's shared instances.
Architecture: Shared Instance Pattern
- Each store feature has exactly 1 instance in the root mandate (created at bootstrap)
- Users activate by getting FeatureAccess + user-role on the shared instance
- Data isolation is guaranteed by read="m" (WHERE _createdBy = userId)
"""
from fastapi import APIRouter, HTTPException, Depends, Request
from typing import List, Dict, Any
from fastapi import status
import logging
from pydantic import BaseModel, Field
from modules.auth import limiter, getRequestContext, RequestContext
from modules.datamodels.datamodelFeatures import FeatureInstance
from modules.datamodels.datamodelMembership import FeatureAccess, FeatureAccessRole
from modules.datamodels.datamodelRbac import AccessRuleContext
from modules.datamodels.datamodelUam import Mandate
from modules.interfaces.interfaceDbApp import getRootInterface
from modules.interfaces.interfaceFeatures import getFeatureInterface
from modules.security.rbacCatalog import getCatalogService
from modules.security.rbac import RbacClass
from modules.security.rootAccess import getRootDbAppConnector
logger = logging.getLogger(__name__)
router = APIRouter(
prefix="/api/store",
tags=["Store"],
responses={404: {"description": "Not found"}}
)
class StoreActivateRequest(BaseModel):
"""Request model for activating a store feature."""
featureCode: str = Field(..., description="Feature code to activate (e.g., 'automation')")
class StoreFeatureResponse(BaseModel):
"""Response model for a store feature."""
featureCode: str
label: Dict[str, str]
icon: str
description: Dict[str, str] = {}
isActive: bool
canActivate: bool
instanceId: str | None = None
def _getRootMandateId(db) -> str | None:
"""Find the root mandate ID."""
mandates = db.getRecordset(Mandate, recordFilter={"name": "root", "isSystem": True})
if mandates:
return mandates[0].get("id")
return None
def _getStoreFeatures(catalogService) -> List[Dict[str, Any]]:
"""Get all features that are available in the store (have resource.store.* entries)."""
resourceObjects = catalogService.getResourceObjects()
storeFeatures = []
for obj in resourceObjects:
meta = obj.get("meta", {})
if meta.get("category") == "store":
featureCode = meta.get("featureCode")
if featureCode:
featureDef = catalogService.getFeatureDefinition(featureCode)
if featureDef:
storeFeatures.append(featureDef)
return storeFeatures
def _checkStorePermission(context: RequestContext, featureCode: str) -> bool:
"""Check if user has RBAC permission to activate a store feature."""
if context.hasSysAdminRole:
return True
resourceItem = f"resource.store.{featureCode}"
dbApp = getRootDbAppConnector()
rbacInstance = RbacClass(dbApp, dbApp=dbApp)
permissions = rbacInstance.getUserPermissions(
context.user,
AccessRuleContext.RESOURCE,
resourceItem,
mandateId=str(context.mandateId) if context.mandateId else None,
)
return permissions.view
def _findSharedInstance(db, rootMandateId: str, featureCode: str) -> Dict[str, Any] | None:
"""Find the shared instance for a feature in the root mandate."""
instances = db.getRecordset(
FeatureInstance,
recordFilter={"mandateId": rootMandateId, "featureCode": featureCode}
)
return instances[0] if instances else None
def _getUserFeatureAccess(db, userId: str, instanceId: str) -> Dict[str, Any] | None:
"""Check if user already has FeatureAccess for an instance."""
accesses = db.getRecordset(
FeatureAccess,
recordFilter={"userId": userId, "featureInstanceId": instanceId}
)
return accesses[0] if accesses else None
def _findUserRole(rootInterface, instanceId: str, featureCode: str) -> str | None:
"""Find the user-level role for a feature instance."""
instanceRoles = rootInterface.getRolesByFeatureInstance(instanceId)
userRoleLabel = f"{featureCode}-user"
for role in instanceRoles:
if role.roleLabel == userRoleLabel:
return str(role.id)
for role in instanceRoles:
if "user" in role.roleLabel.lower() and "admin" not in role.roleLabel.lower():
return str(role.id)
return None
@router.get("/features", response_model=List[StoreFeatureResponse])
@limiter.limit("60/minute")
def listStoreFeatures(
request: Request,
context: RequestContext = Depends(getRequestContext)
) -> List[StoreFeatureResponse]:
"""
List all store features with activation status and permissions.
Returns the store catalog showing which features are available,
which are already activated, and whether the user can activate them.
"""
try:
rootInterface = getRootInterface()
db = rootInterface.db
catalogService = getCatalogService()
rootMandateId = _getRootMandateId(db)
if not rootMandateId:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Root mandate not found"
)
storeFeatures = _getStoreFeatures(catalogService)
userId = str(context.user.id)
result = []
for featureDef in storeFeatures:
featureCode = featureDef["code"]
sharedInstance = _findSharedInstance(db, rootMandateId, featureCode)
instanceId = sharedInstance.get("id") if sharedInstance else None
isActive = False
if instanceId:
existingAccess = _getUserFeatureAccess(db, userId, instanceId)
isActive = existingAccess is not None
canActivate = _checkStorePermission(context, featureCode) and not isActive
result.append(StoreFeatureResponse(
featureCode=featureCode,
label=featureDef.get("label", {}),
icon=featureDef.get("icon", "mdi-puzzle"),
isActive=isActive,
canActivate=canActivate,
instanceId=instanceId,
))
return result
except HTTPException:
raise
except Exception as e:
logger.error(f"Error listing store features: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to list store features: {str(e)}"
)
@router.post("/activate", response_model=Dict[str, Any])
@limiter.limit("10/minute")
def activateStoreFeature(
request: Request,
data: StoreActivateRequest,
context: RequestContext = Depends(getRequestContext)
) -> Dict[str, Any]:
"""
Activate a store feature for the current user.
Creates FeatureAccess + FeatureAccessRole on the shared instance
in the root mandate. The user gets the feature's user-level role.
"""
featureCode = data.featureCode
userId = str(context.user.id)
try:
rootInterface = getRootInterface()
db = rootInterface.db
if not _checkStorePermission(context, featureCode):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"No permission to activate feature '{featureCode}'"
)
catalogService = getCatalogService()
featureDef = catalogService.getFeatureDefinition(featureCode)
if not featureDef:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Feature '{featureCode}' not found"
)
rootMandateId = _getRootMandateId(db)
if not rootMandateId:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Root mandate not found"
)
sharedInstance = _findSharedInstance(db, rootMandateId, featureCode)
if not sharedInstance:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Shared instance for '{featureCode}' not found in root mandate"
)
instanceId = sharedInstance.get("id")
existingAccess = _getUserFeatureAccess(db, userId, instanceId)
if existingAccess:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=f"Feature '{featureCode}' is already active"
)
featureAccess = FeatureAccess(
userId=userId,
featureInstanceId=instanceId,
enabled=True
)
createdAccess = db.recordCreate(FeatureAccess, featureAccess.model_dump())
featureAccessId = createdAccess.get("id")
userRoleId = _findUserRole(rootInterface, instanceId, featureCode)
if userRoleId:
featureAccessRole = FeatureAccessRole(
featureAccessId=featureAccessId,
roleId=userRoleId
)
db.recordCreate(FeatureAccessRole, featureAccessRole.model_dump())
logger.info(
f"User {userId} activated store feature '{featureCode}' "
f"(instance={instanceId}, role={userRoleId})"
)
return {
"featureCode": featureCode,
"instanceId": instanceId,
"featureAccessId": featureAccessId,
"roleId": userRoleId,
"activated": True,
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error activating store feature '{featureCode}': {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to activate feature: {str(e)}"
)
@router.post("/deactivate", response_model=Dict[str, Any])
@limiter.limit("10/minute")
def deactivateStoreFeature(
request: Request,
data: StoreActivateRequest,
context: RequestContext = Depends(getRequestContext)
) -> Dict[str, Any]:
"""
Deactivate a store feature for the current user.
Removes FeatureAccess (CASCADE deletes FeatureAccessRole).
User loses access immediately.
"""
featureCode = data.featureCode
userId = str(context.user.id)
try:
rootInterface = getRootInterface()
db = rootInterface.db
rootMandateId = _getRootMandateId(db)
if not rootMandateId:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Root mandate not found"
)
sharedInstance = _findSharedInstance(db, rootMandateId, featureCode)
if not sharedInstance:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Shared instance for '{featureCode}' not found"
)
instanceId = sharedInstance.get("id")
existingAccess = _getUserFeatureAccess(db, userId, instanceId)
if not existingAccess:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Feature '{featureCode}' is not active"
)
featureAccessId = existingAccess.get("id")
db.recordDelete(FeatureAccess, featureAccessId)
logger.info(f"User {userId} deactivated store feature '{featureCode}' (instance={instanceId})")
return {
"featureCode": featureCode,
"instanceId": instanceId,
"deactivated": True,
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error deactivating store feature '{featureCode}': {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to deactivate feature: {str(e)}"
)