fixes for roles

This commit is contained in:
patrick-motsch 2026-02-23 17:13:50 +01:00
parent f35a90e428
commit 9d129f111f
9 changed files with 449 additions and 1055 deletions

3
app.py
View file

@ -537,6 +537,9 @@ app.include_router(messagingRouter)
from modules.routes.routeAdminFeatures import router as featuresAdminRouter from modules.routes.routeAdminFeatures import router as featuresAdminRouter
app.include_router(featuresAdminRouter) app.include_router(featuresAdminRouter)
from modules.routes.routeStore import router as storeRouter
app.include_router(storeRouter)
from modules.routes.routeInvitations import router as invitationsRouter from modules.routes.routeInvitations import router as invitationsRouter
app.include_router(invitationsRouter) app.include_router(invitationsRouter)

View file

@ -90,6 +90,20 @@ TEMPLATE_ROLES = [
{"context": "DATA", "item": None, "view": True, "read": "g", "create": "g", "update": "g", "delete": "n"}, {"context": "DATA", "item": None, "view": True, "read": "g", "create": "g", "update": "g", "delete": "n"},
] ]
}, },
{
"roleLabel": "automation-user",
"description": {
"en": "Automation User - Create and manage own automations",
"de": "Automatisierungs-Benutzer - Eigene Automatisierungen erstellen und verwalten",
"fr": "Utilisateur automatisation - Créer et gérer ses propres automatisations"
},
"accessRules": [
{"context": "UI", "item": "ui.feature.automation.definitions", "view": True},
{"context": "UI", "item": "ui.feature.automation.templates", "view": True},
{"context": "UI", "item": "ui.feature.automation.logs", "view": True},
{"context": "DATA", "item": None, "view": True, "read": "m", "create": "m", "update": "m", "delete": "m"},
]
},
{ {
"roleLabel": "automation-viewer", "roleLabel": "automation-viewer",
"description": { "description": {

View file

@ -131,7 +131,8 @@ def getFeatureDefinition() -> Dict[str, Any]:
return { return {
"code": FEATURE_CODE, "code": FEATURE_CODE,
"label": FEATURE_LABEL, "label": FEATURE_LABEL,
"icon": FEATURE_ICON "icon": FEATURE_ICON,
"autoCreateInstance": True,
} }

View file

@ -1824,6 +1824,9 @@ def _createResourceContextRules(db: DatabaseConnector) -> None:
# Create AICore provider RBAC rules # Create AICore provider RBAC rules
_createAicoreProviderRules(db) _createAicoreProviderRules(db)
# Create Store resource RBAC rules
_createStoreResourceRules(db)
def _createAicoreProviderRules(db: DatabaseConnector) -> None: def _createAicoreProviderRules(db: DatabaseConnector) -> None:
""" """
@ -1914,6 +1917,55 @@ def _createAicoreProviderRules(db: DatabaseConnector) -> None:
logger.warning(f"Failed to create AICore provider RBAC rules: {e}") logger.warning(f"Failed to create AICore provider RBAC rules: {e}")
def _createStoreResourceRules(db: DatabaseConnector) -> None:
"""
Create RBAC rules for Store feature activation resources.
Store resources control which roles can activate features via the Store.
- admin/user: view=True (can see and activate store features)
- viewer: no store access
- sysadmin: covered by generic RESOURCE rule (item=None, view=True)
Args:
db: Database connector instance
"""
storeResources = [
"resource.store.automation",
"resource.store.chatplayground",
"resource.store.teamsbot",
]
storeRules = []
for roleLabel in ["admin", "user"]:
roleId = _getRoleId(db, roleLabel)
if not roleId:
continue
for resourceKey in storeResources:
existingRules = db.getRecordset(
AccessRule,
recordFilter={
"roleId": roleId,
"context": AccessRuleContext.RESOURCE.value,
"item": resourceKey
}
)
if not existingRules:
storeRules.append(AccessRule(
roleId=roleId,
context=AccessRuleContext.RESOURCE,
item=resourceKey,
view=True,
read=None, create=None, update=None, delete=None,
))
for rule in storeRules:
db.recordCreate(AccessRule, rule)
if storeRules:
logger.info(f"Created {len(storeRules)} Store resource RBAC rules")
def initRootMandateBilling(mandateId: str) -> None: def initRootMandateBilling(mandateId: str) -> None:
""" """
Initialize billing settings for root mandate. Initialize billing settings for root mandate.

File diff suppressed because it is too large Load diff

View file

@ -873,9 +873,14 @@ def list_roles(
if role.featureInstanceId is not None: if role.featureInstanceId is not None:
continue continue
# Include global roles (mandateId=None) OR mandate-specific roles if mandateId matches # When mandateId requested: only mandate-scoped roles
if role.mandateId is not None and (mandateId is None or role.mandateId != mandateId): # When no mandateId: only global roles (mandateId=None)
continue if mandateId:
if role.mandateId != mandateId:
continue
else:
if role.mandateId is not None:
continue
# Filter: Exclude feature template roles unless includeTemplates=True # Filter: Exclude feature template roles unless includeTemplates=True
if not includeTemplates and role.featureCode is not None: if not includeTemplates and role.featureCode is not None:
@ -978,47 +983,6 @@ def list_roles(
) )
@router.get("/roles/options", response_model=List[Dict[str, Any]])
@limiter.limit("60/minute")
def get_role_options(
request: Request,
currentUser: User = Depends(requireSysAdminRole)
) -> List[Dict[str, Any]]:
"""
Get role options for select dropdowns.
MULTI-TENANT: SysAdmin-only.
Returns:
- List of role option dictionaries with value and label
"""
try:
interface = getRootInterface()
# Get all roles from database
dbRoles = interface.getAllRoles()
# Convert to options format
options = []
for role in dbRoles:
# Use English description as label, fallback to roleLabel
label = role.description.get("en", role.roleLabel) if isinstance(role.description, dict) else role.roleLabel
options.append({
"value": role.roleLabel,
"label": label
})
return options
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting role options: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Failed to get role options: {str(e)}"
)
@router.post("/roles", response_model=Dict[str, Any]) @router.post("/roles", response_model=Dict[str, Any])
@limiter.limit("30/minute") @limiter.limit("30/minute")
def create_role( def create_role(

View file

@ -540,21 +540,13 @@ def add_user_to_mandate(
""" """
Add a user to a mandate with specified roles. Add a user to a mandate with specified roles.
Requires Mandate-Admin role. Requires Mandate-Admin role (SysAdmin passes automatically).
SysAdmin cannot add themselves (Self-Eskalation Prevention).
Args: Args:
targetMandateId: Target mandate ID targetMandateId: Target mandate ID
data: User ID and role IDs to assign data: User ID and role IDs to assign
""" """
# 1. SysAdmin Self-Eskalation Prevention # Check Mandate-Admin permission
if context.hasSysAdminRole and data.targetUserId == str(context.user.id):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="SysAdmin cannot add themselves to a mandate. A Mandate-Admin must grant access."
)
# 2. Check Mandate-Admin permission
if not _hasMandateAdminRole(context, targetMandateId): if not _hasMandateAdminRole(context, targetMandateId):
raise HTTPException( raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, status_code=status.HTTP_403_FORBIDDEN,

View file

@ -0,0 +1,344 @@
# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Feature Store routes.
Allows users to self-activate features in the root mandate's shared instances.
Architecture: Shared Instance Pattern
- Each store feature has exactly 1 instance in the root mandate (created at bootstrap)
- Users activate by getting FeatureAccess + user-role on the shared instance
- Data isolation is guaranteed by read="m" (WHERE _createdBy = userId)
"""
from fastapi import APIRouter, HTTPException, Depends, Request
from typing import List, Dict, Any
from fastapi import status
import logging
from pydantic import BaseModel, Field
from modules.auth import limiter, getRequestContext, RequestContext
from modules.datamodels.datamodelFeatures import FeatureInstance
from modules.datamodels.datamodelMembership import FeatureAccess, FeatureAccessRole
from modules.datamodels.datamodelRbac import AccessRuleContext
from modules.datamodels.datamodelUam import Mandate
from modules.interfaces.interfaceDbApp import getRootInterface
from modules.interfaces.interfaceFeatures import getFeatureInterface
from modules.security.rbacCatalog import getCatalogService
from modules.security.rbac import RbacClass
from modules.security.rootAccess import getRootDbAppConnector
logger = logging.getLogger(__name__)
router = APIRouter(
prefix="/api/store",
tags=["Store"],
responses={404: {"description": "Not found"}}
)
class StoreActivateRequest(BaseModel):
"""Request model for activating a store feature."""
featureCode: str = Field(..., description="Feature code to activate (e.g., 'automation')")
class StoreFeatureResponse(BaseModel):
"""Response model for a store feature."""
featureCode: str
label: Dict[str, str]
icon: str
description: Dict[str, str] = {}
isActive: bool
canActivate: bool
instanceId: str | None = None
def _getRootMandateId(db) -> str | None:
"""Find the root mandate ID."""
mandates = db.getRecordset(Mandate, recordFilter={"name": "root", "isSystem": True})
if mandates:
return mandates[0].get("id")
return None
def _getStoreFeatures(catalogService) -> List[Dict[str, Any]]:
"""Get all features that are available in the store (have resource.store.* entries)."""
resourceObjects = catalogService.getResourceObjects()
storeFeatures = []
for obj in resourceObjects:
meta = obj.get("meta", {})
if meta.get("category") == "store":
featureCode = meta.get("featureCode")
if featureCode:
featureDef = catalogService.getFeatureDefinition(featureCode)
if featureDef:
storeFeatures.append(featureDef)
return storeFeatures
def _checkStorePermission(context: RequestContext, featureCode: str) -> bool:
"""Check if user has RBAC permission to activate a store feature."""
if context.hasSysAdminRole:
return True
resourceItem = f"resource.store.{featureCode}"
dbApp = getRootDbAppConnector()
rbacInstance = RbacClass(dbApp, dbApp=dbApp)
permissions = rbacInstance.getUserPermissions(
context.user,
AccessRuleContext.RESOURCE,
resourceItem,
mandateId=str(context.mandateId) if context.mandateId else None,
)
return permissions.view
def _findSharedInstance(db, rootMandateId: str, featureCode: str) -> Dict[str, Any] | None:
"""Find the shared instance for a feature in the root mandate."""
instances = db.getRecordset(
FeatureInstance,
recordFilter={"mandateId": rootMandateId, "featureCode": featureCode}
)
return instances[0] if instances else None
def _getUserFeatureAccess(db, userId: str, instanceId: str) -> Dict[str, Any] | None:
"""Check if user already has FeatureAccess for an instance."""
accesses = db.getRecordset(
FeatureAccess,
recordFilter={"userId": userId, "featureInstanceId": instanceId}
)
return accesses[0] if accesses else None
def _findUserRole(rootInterface, instanceId: str, featureCode: str) -> str | None:
"""Find the user-level role for a feature instance."""
instanceRoles = rootInterface.getRolesByFeatureInstance(instanceId)
userRoleLabel = f"{featureCode}-user"
for role in instanceRoles:
if role.roleLabel == userRoleLabel:
return str(role.id)
for role in instanceRoles:
if "user" in role.roleLabel.lower() and "admin" not in role.roleLabel.lower():
return str(role.id)
return None
@router.get("/features", response_model=List[StoreFeatureResponse])
@limiter.limit("60/minute")
def listStoreFeatures(
request: Request,
context: RequestContext = Depends(getRequestContext)
) -> List[StoreFeatureResponse]:
"""
List all store features with activation status and permissions.
Returns the store catalog showing which features are available,
which are already activated, and whether the user can activate them.
"""
try:
rootInterface = getRootInterface()
db = rootInterface.db
catalogService = getCatalogService()
rootMandateId = _getRootMandateId(db)
if not rootMandateId:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Root mandate not found"
)
storeFeatures = _getStoreFeatures(catalogService)
userId = str(context.user.id)
result = []
for featureDef in storeFeatures:
featureCode = featureDef["code"]
sharedInstance = _findSharedInstance(db, rootMandateId, featureCode)
instanceId = sharedInstance.get("id") if sharedInstance else None
isActive = False
if instanceId:
existingAccess = _getUserFeatureAccess(db, userId, instanceId)
isActive = existingAccess is not None
canActivate = _checkStorePermission(context, featureCode) and not isActive
result.append(StoreFeatureResponse(
featureCode=featureCode,
label=featureDef.get("label", {}),
icon=featureDef.get("icon", "mdi-puzzle"),
isActive=isActive,
canActivate=canActivate,
instanceId=instanceId,
))
return result
except HTTPException:
raise
except Exception as e:
logger.error(f"Error listing store features: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to list store features: {str(e)}"
)
@router.post("/activate", response_model=Dict[str, Any])
@limiter.limit("10/minute")
def activateStoreFeature(
request: Request,
data: StoreActivateRequest,
context: RequestContext = Depends(getRequestContext)
) -> Dict[str, Any]:
"""
Activate a store feature for the current user.
Creates FeatureAccess + FeatureAccessRole on the shared instance
in the root mandate. The user gets the feature's user-level role.
"""
featureCode = data.featureCode
userId = str(context.user.id)
try:
rootInterface = getRootInterface()
db = rootInterface.db
if not _checkStorePermission(context, featureCode):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"No permission to activate feature '{featureCode}'"
)
catalogService = getCatalogService()
featureDef = catalogService.getFeatureDefinition(featureCode)
if not featureDef:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Feature '{featureCode}' not found"
)
rootMandateId = _getRootMandateId(db)
if not rootMandateId:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Root mandate not found"
)
sharedInstance = _findSharedInstance(db, rootMandateId, featureCode)
if not sharedInstance:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Shared instance for '{featureCode}' not found in root mandate"
)
instanceId = sharedInstance.get("id")
existingAccess = _getUserFeatureAccess(db, userId, instanceId)
if existingAccess:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=f"Feature '{featureCode}' is already active"
)
featureAccess = FeatureAccess(
userId=userId,
featureInstanceId=instanceId,
enabled=True
)
createdAccess = db.recordCreate(FeatureAccess, featureAccess.model_dump())
featureAccessId = createdAccess.get("id")
userRoleId = _findUserRole(rootInterface, instanceId, featureCode)
if userRoleId:
featureAccessRole = FeatureAccessRole(
featureAccessId=featureAccessId,
roleId=userRoleId
)
db.recordCreate(FeatureAccessRole, featureAccessRole.model_dump())
logger.info(
f"User {userId} activated store feature '{featureCode}' "
f"(instance={instanceId}, role={userRoleId})"
)
return {
"featureCode": featureCode,
"instanceId": instanceId,
"featureAccessId": featureAccessId,
"roleId": userRoleId,
"activated": True,
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error activating store feature '{featureCode}': {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to activate feature: {str(e)}"
)
@router.post("/deactivate", response_model=Dict[str, Any])
@limiter.limit("10/minute")
def deactivateStoreFeature(
request: Request,
data: StoreActivateRequest,
context: RequestContext = Depends(getRequestContext)
) -> Dict[str, Any]:
"""
Deactivate a store feature for the current user.
Removes FeatureAccess (CASCADE deletes FeatureAccessRole).
User loses access immediately.
"""
featureCode = data.featureCode
userId = str(context.user.id)
try:
rootInterface = getRootInterface()
db = rootInterface.db
rootMandateId = _getRootMandateId(db)
if not rootMandateId:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Root mandate not found"
)
sharedInstance = _findSharedInstance(db, rootMandateId, featureCode)
if not sharedInstance:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Shared instance for '{featureCode}' not found"
)
instanceId = sharedInstance.get("id")
existingAccess = _getUserFeatureAccess(db, userId, instanceId)
if not existingAccess:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Feature '{featureCode}' is not active"
)
featureAccessId = existingAccess.get("id")
db.recordDelete(FeatureAccess, featureAccessId)
logger.info(f"User {userId} deactivated store feature '{featureCode}' (instance={instanceId})")
return {
"featureCode": featureCode,
"instanceId": instanceId,
"deactivated": True,
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error deactivating store feature '{featureCode}': {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to deactivate feature: {str(e)}"
)

View file

@ -49,6 +49,15 @@ NAVIGATION_SECTIONS = [
"order": 10, "order": 10,
"public": True, "public": True,
}, },
{
"id": "store",
"objectKey": "ui.system.store",
"label": {"en": "Store", "de": "Store", "fr": "Store"},
"icon": "FaStore",
"path": "/store",
"order": 15,
"public": True,
},
{ {
"id": "settings", "id": "settings",
"objectKey": "ui.system.settings", "objectKey": "ui.system.settings",
@ -423,6 +432,21 @@ DATA_OBJECTS = [
# ============================================================================= # =============================================================================
RESOURCE_OBJECTS = [ RESOURCE_OBJECTS = [
{
"objectKey": "resource.store.automation",
"label": {"en": "Store: Automation", "de": "Store: Automation", "fr": "Store: Automatisation"},
"meta": {"category": "store", "featureCode": "automation"}
},
{
"objectKey": "resource.store.chatplayground",
"label": {"en": "Store: Chat Playground", "de": "Store: Chat Playground", "fr": "Store: Chat Playground"},
"meta": {"category": "store", "featureCode": "chatplayground"}
},
{
"objectKey": "resource.store.teamsbot",
"label": {"en": "Store: Teams Bot", "de": "Store: Teams Bot", "fr": "Store: Teams Bot"},
"meta": {"category": "store", "featureCode": "teamsbot"}
},
{ {
"objectKey": "resource.system.api.auth", "objectKey": "resource.system.api.auth",
"label": {"en": "Authentication API", "de": "Authentifizierungs-API", "fr": "API d'authentification"}, "label": {"en": "Authentication API", "de": "Authentifizierungs-API", "fr": "API d'authentification"},