From 9d129f111f539b5356fbf8095fb4f06fd2451c51 Mon Sep 17 00:00:00 2001 From: patrick-motsch Date: Mon, 23 Feb 2026 17:13:50 +0100 Subject: [PATCH] fixes for roles --- app.py | 3 + modules/features/automation/mainAutomation.py | 14 + modules/features/teamsbot/mainTeamsbot.py | 3 +- modules/interfaces/interfaceBootstrap.py | 52 + modules/routes/routeAdminRbacRoles.py | 1000 ----------------- modules/routes/routeAdminRbacRules.py | 52 +- modules/routes/routeDataMandates.py | 12 +- modules/routes/routeStore.py | 344 ++++++ modules/system/mainSystem.py | 24 + 9 files changed, 449 insertions(+), 1055 deletions(-) delete mode 100644 modules/routes/routeAdminRbacRoles.py create mode 100644 modules/routes/routeStore.py diff --git a/app.py b/app.py index 0033b4a1..540d4e4d 100644 --- a/app.py +++ b/app.py @@ -537,6 +537,9 @@ app.include_router(messagingRouter) from modules.routes.routeAdminFeatures import router as featuresAdminRouter app.include_router(featuresAdminRouter) +from modules.routes.routeStore import router as storeRouter +app.include_router(storeRouter) + from modules.routes.routeInvitations import router as invitationsRouter app.include_router(invitationsRouter) diff --git a/modules/features/automation/mainAutomation.py b/modules/features/automation/mainAutomation.py index e23b320f..239b1c15 100644 --- a/modules/features/automation/mainAutomation.py +++ b/modules/features/automation/mainAutomation.py @@ -90,6 +90,20 @@ TEMPLATE_ROLES = [ {"context": "DATA", "item": None, "view": True, "read": "g", "create": "g", "update": "g", "delete": "n"}, ] }, + { + "roleLabel": "automation-user", + "description": { + "en": "Automation User - Create and manage own automations", + "de": "Automatisierungs-Benutzer - Eigene Automatisierungen erstellen und verwalten", + "fr": "Utilisateur automatisation - Créer et gérer ses propres automatisations" + }, + "accessRules": [ + {"context": "UI", "item": "ui.feature.automation.definitions", "view": True}, + {"context": "UI", "item": "ui.feature.automation.templates", "view": True}, + {"context": "UI", "item": "ui.feature.automation.logs", "view": True}, + {"context": "DATA", "item": None, "view": True, "read": "m", "create": "m", "update": "m", "delete": "m"}, + ] + }, { "roleLabel": "automation-viewer", "description": { diff --git a/modules/features/teamsbot/mainTeamsbot.py b/modules/features/teamsbot/mainTeamsbot.py index 6dd161e0..97cc107e 100644 --- a/modules/features/teamsbot/mainTeamsbot.py +++ b/modules/features/teamsbot/mainTeamsbot.py @@ -131,7 +131,8 @@ def getFeatureDefinition() -> Dict[str, Any]: return { "code": FEATURE_CODE, "label": FEATURE_LABEL, - "icon": FEATURE_ICON + "icon": FEATURE_ICON, + "autoCreateInstance": True, } diff --git a/modules/interfaces/interfaceBootstrap.py b/modules/interfaces/interfaceBootstrap.py index 141f7da2..ae6ef2e5 100644 --- a/modules/interfaces/interfaceBootstrap.py +++ b/modules/interfaces/interfaceBootstrap.py @@ -1823,6 +1823,9 @@ def _createResourceContextRules(db: DatabaseConnector) -> None: # Create AICore provider RBAC rules _createAicoreProviderRules(db) + + # Create Store resource RBAC rules + _createStoreResourceRules(db) def _createAicoreProviderRules(db: DatabaseConnector) -> None: @@ -1914,6 +1917,55 @@ def _createAicoreProviderRules(db: DatabaseConnector) -> None: logger.warning(f"Failed to create AICore provider RBAC rules: {e}") +def _createStoreResourceRules(db: DatabaseConnector) -> None: + """ + Create RBAC rules for Store feature activation resources. + + Store resources control which roles can activate features via the Store. + - admin/user: view=True (can see and activate store features) + - viewer: no store access + - sysadmin: covered by generic RESOURCE rule (item=None, view=True) + + Args: + db: Database connector instance + """ + storeResources = [ + "resource.store.automation", + "resource.store.chatplayground", + "resource.store.teamsbot", + ] + + storeRules = [] + + for roleLabel in ["admin", "user"]: + roleId = _getRoleId(db, roleLabel) + if not roleId: + continue + for resourceKey in storeResources: + existingRules = db.getRecordset( + AccessRule, + recordFilter={ + "roleId": roleId, + "context": AccessRuleContext.RESOURCE.value, + "item": resourceKey + } + ) + if not existingRules: + storeRules.append(AccessRule( + roleId=roleId, + context=AccessRuleContext.RESOURCE, + item=resourceKey, + view=True, + read=None, create=None, update=None, delete=None, + )) + + for rule in storeRules: + db.recordCreate(AccessRule, rule) + + if storeRules: + logger.info(f"Created {len(storeRules)} Store resource RBAC rules") + + def initRootMandateBilling(mandateId: str) -> None: """ Initialize billing settings for root mandate. diff --git a/modules/routes/routeAdminRbacRoles.py b/modules/routes/routeAdminRbacRoles.py deleted file mode 100644 index c3cf655c..00000000 --- a/modules/routes/routeAdminRbacRoles.py +++ /dev/null @@ -1,1000 +0,0 @@ -# Copyright (c) 2025 Patrick Motsch -# All rights reserved. -""" -Admin RBAC Roles Management routes. -Provides endpoints for managing roles and role assignments to users. - -MULTI-TENANT: Context-aware access control. -- SysAdmin: Full access to all roles and assignments across all mandates. -- MandateAdmin: Can manage roles and assignments within their own mandates. - Template roles (mandateId=None, isSystemRole=True) are read-only. - The sysadmin role (roleLabel="sysadmin") is not manageable by MandateAdmins. -Role assignments are managed via UserMandateRole (not User.roleLabels). -""" - -from fastapi import APIRouter, HTTPException, Depends, Query, Body, Path, Request, status -from typing import List, Dict, Any, Optional, Set -import logging - -from modules.auth import limiter, requireSysAdminRole, getRequestContext, RequestContext -from modules.datamodels.datamodelUam import User, UserInDB -from modules.datamodels.datamodelRbac import Role -from modules.datamodels.datamodelMembership import UserMandate, UserMandateRole -from modules.interfaces.interfaceDbApp import getInterface, getRootInterface - -# Configure logger -logger = logging.getLogger(__name__) - - -def _getUserRoleLabels(interface, userId: str) -> List[str]: - """ - Get role labels for a user from UserMandateRole (across all mandates). - - Args: - interface: Database interface - userId: User ID - - Returns: - List of role labels - """ - roleLabels: Set[str] = set() - - # Get all UserMandate records for this user (Pydantic models) - userMandates = interface.getUserMandates(userId) - - for um in userMandates: - # Get all UserMandateRole records for this membership (Pydantic models) - userMandateRoles = interface.getUserMandateRoles(str(um.id)) - - for umr in userMandateRoles: - if umr.roleId: - # Get role by ID to get roleLabel - role = interface.getRole(str(umr.roleId)) - if role: - roleLabels.add(role.roleLabel) - - return list(roleLabels) - - -def _hasRoleLabel(interface, userId: str, roleLabel: str) -> bool: - """ - Check if user has a specific role label (across all mandates). - """ - return roleLabel in _getUserRoleLabels(interface, userId) - - -def _getAdminMandateIds(context: RequestContext) -> List[str]: - """Get mandate IDs where the user has admin role.""" - mandateIds = [] - try: - rootInterface = getRootInterface() - userMandates = rootInterface.getUserMandates(str(context.user.id)) - for um in userMandates: - if not getattr(um, 'enabled', True): - continue - umId = getattr(um, 'id', None) - mandateId = getattr(um, 'mandateId', None) - if not umId or not mandateId: - continue - roleIds = rootInterface.getRoleIdsForUserMandate(str(umId)) - for roleId in roleIds: - role = rootInterface.getRole(roleId) - if role and role.roleLabel == "admin" and not role.featureInstanceId: - mandateIds.append(str(mandateId)) - break - except Exception as e: - logger.error(f"Error getting admin mandate IDs: {e}") - return mandateIds - - -router = APIRouter( - prefix="/api/admin/rbac/roles", - tags=["Admin RBAC Roles"], - responses={404: {"description": "Not found"}} -) - - -@router.get("/", response_model=List[Dict[str, Any]]) -@limiter.limit("60/minute") -def list_roles( - request: Request, - mandateId: Optional[str] = Query(None, description="Filter roles by mandate ID"), - context: RequestContext = Depends(getRequestContext) -) -> List[Dict[str, Any]]: - """ - Get list of roles with metadata. - - Context-aware: SysAdmin sees all roles. MandateAdmin sees roles from own mandates - plus template roles (read-only). - - Without mandateId: returns system template roles (mandateId=NULL). - With mandateId: returns mandate-level roles for that mandate (featureInstanceId=NULL). - """ - isSysAdmin = context.hasSysAdminRole - adminMandateIds = [] if isSysAdmin else _getAdminMandateIds(context) - if not isSysAdmin and not adminMandateIds: - raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin role required") - currentUser = context.user # backward compat for existing code - - try: - interface = getRootInterface() - - # Get roles filtered by scope - print(f"[DEBUG list_roles] mandateId={mandateId}") - if mandateId: - # MandateAdmin can only query mandates they admin - if not isSysAdmin and mandateId not in adminMandateIds: - raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Access denied to this mandate") - # Mandate-specific roles (mandate-level only, no feature-instance roles) - dbRoles = interface.getRolesForMandate(mandateId) - print(f"[DEBUG list_roles] getRolesForMandate returned {len(dbRoles)} roles") - else: - # System template roles only - dbRoles = interface.getAllRoles() - print(f"[DEBUG list_roles] getAllRoles returned {len(dbRoles)} roles") - # MandateAdmin: filter to template roles + roles from own mandates - if not isSysAdmin: - dbRoles = [r for r in dbRoles if r.mandateId is None or str(r.mandateId) in adminMandateIds] - - # Count role assignments from UserMandateRole table - roleCounts = interface.countRoleAssignments() - - # Convert Role objects to dictionaries and add user counts - result = [] - for role in dbRoles: - result.append({ - "id": role.id, - "roleLabel": role.roleLabel, - "description": role.description, - "mandateId": role.mandateId, - "featureInstanceId": role.featureInstanceId, - "userCount": roleCounts.get(str(role.id), 0), - "isSystemRole": role.isSystemRole - }) - - return result - - except HTTPException: - raise - except Exception as e: - logger.error(f"Error listing roles: {str(e)}") - raise HTTPException( - status_code=500, - detail=f"Failed to list roles: {str(e)}" - ) - - -@router.get("/options", response_model=List[Dict[str, Any]]) -@limiter.limit("60/minute") -def get_role_options( - request: Request, - context: RequestContext = Depends(getRequestContext) -) -> List[Dict[str, Any]]: - """ - Get role options for select dropdowns. - Context-aware: SysAdmin sees all roles. MandateAdmin sees roles from own mandates - plus template roles. - - Returns: - - List of role option dictionaries with value and label - """ - isSysAdmin = context.hasSysAdminRole - adminMandateIds = [] if isSysAdmin else _getAdminMandateIds(context) - if not isSysAdmin and not adminMandateIds: - raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin role required") - currentUser = context.user # backward compat for existing code - - try: - interface = getRootInterface() - - # Get all roles from database - dbRoles = interface.getAllRoles() - - # MandateAdmin: filter to template roles + roles from own mandates - if not isSysAdmin: - dbRoles = [r for r in dbRoles if r.mandateId is None or str(r.mandateId) in adminMandateIds] - - # Convert to options format - options = [] - for role in dbRoles: - # Use English description as label, fallback to roleLabel - label = role.description.get("en", role.roleLabel) if isinstance(role.description, dict) else role.roleLabel - options.append({ - "value": role.roleLabel, - "label": label - }) - - return options - - except HTTPException: - raise - except Exception as e: - logger.error(f"Error getting role options: {str(e)}") - raise HTTPException( - status_code=500, - detail=f"Failed to get role options: {str(e)}" - ) - - -@router.post("/", response_model=Dict[str, Any]) -@limiter.limit("30/minute") -def create_role( - request: Request, - role: Role = Body(...), - context: RequestContext = Depends(getRequestContext) -) -> Dict[str, Any]: - """ - Create a new role. - Context-aware: SysAdmin can create any role. MandateAdmin can create roles - within own mandates only (not template or sysadmin roles). - - Request Body: - - role: Role object to create - - Returns: - - Created role dictionary - """ - isSysAdmin = context.hasSysAdminRole - adminMandateIds = [] if isSysAdmin else _getAdminMandateIds(context) - if not isSysAdmin and not adminMandateIds: - raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin role required") - currentUser = context.user # backward compat for existing code - - try: - interface = getRootInterface() - - # MandateAdmin restrictions - if not isSysAdmin: - if role.roleLabel == "sysadmin": - raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Cannot create sysadmin role") - if role.mandateId is None: - raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Cannot create template roles") - if str(role.mandateId) not in adminMandateIds: - raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Access denied to this mandate") - - createdRole = interface.createRole(role) - - return { - "id": createdRole.id, - "roleLabel": createdRole.roleLabel, - "description": createdRole.description, - "isSystemRole": createdRole.isSystemRole - } - - except HTTPException: - raise - except ValueError as e: - raise HTTPException( - status_code=400, - detail=str(e) - ) - except Exception as e: - logger.error(f"Error creating role: {str(e)}") - raise HTTPException( - status_code=500, - detail=f"Failed to create role: {str(e)}" - ) - - -@router.get("/{roleId}", response_model=Dict[str, Any]) -@limiter.limit("60/minute") -def get_role( - request: Request, - roleId: str = Path(..., description="Role ID"), - context: RequestContext = Depends(getRequestContext) -) -> Dict[str, Any]: - """ - Get a role by ID. - Context-aware: SysAdmin sees all. MandateAdmin sees roles from own mandates - plus template roles (read-only). - - Path Parameters: - - roleId: Role ID - - Returns: - - Role dictionary - """ - isSysAdmin = context.hasSysAdminRole - adminMandateIds = [] if isSysAdmin else _getAdminMandateIds(context) - if not isSysAdmin and not adminMandateIds: - raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin role required") - currentUser = context.user # backward compat for existing code - - try: - interface = getRootInterface() - - role = interface.getRole(roleId) - if not role: - raise HTTPException( - status_code=404, - detail=f"Role {roleId} not found" - ) - - # MandateAdmin: can view template roles (read-only) or own mandate roles - if not isSysAdmin: - if role.mandateId is not None and str(role.mandateId) not in adminMandateIds: - raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Access denied to this role") - - return { - "id": role.id, - "roleLabel": role.roleLabel, - "description": role.description, - "isSystemRole": role.isSystemRole - } - - except HTTPException: - raise - except Exception as e: - logger.error(f"Error getting role: {str(e)}") - raise HTTPException( - status_code=500, - detail=f"Failed to get role: {str(e)}" - ) - - -@router.put("/{roleId}", response_model=Dict[str, Any]) -@limiter.limit("30/minute") -def update_role( - request: Request, - roleId: str = Path(..., description="Role ID"), - role: Role = Body(...), - context: RequestContext = Depends(getRequestContext) -) -> Dict[str, Any]: - """ - Update an existing role. - Context-aware: SysAdmin can update any role. MandateAdmin can update roles - within own mandates only. Template roles and sysadmin role are blocked. - - Path Parameters: - - roleId: Role ID - - Request Body: - - role: Updated Role object - - Returns: - - Updated role dictionary - """ - isSysAdmin = context.hasSysAdminRole - adminMandateIds = [] if isSysAdmin else _getAdminMandateIds(context) - if not isSysAdmin and not adminMandateIds: - raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin role required") - currentUser = context.user # backward compat for existing code - - try: - interface = getRootInterface() - - # MandateAdmin restrictions: check existing role before updating - if not isSysAdmin: - existingRole = interface.getRole(roleId) - if not existingRole: - raise HTTPException(status_code=404, detail=f"Role {roleId} not found") - if existingRole.roleLabel == "sysadmin": - raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Cannot modify sysadmin role") - if existingRole.mandateId is None: - raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Cannot modify template roles") - if str(existingRole.mandateId) not in adminMandateIds: - raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Access denied to this role") - - updatedRole = interface.updateRole(roleId, role) - - return { - "id": updatedRole.id, - "roleLabel": updatedRole.roleLabel, - "description": updatedRole.description, - "isSystemRole": updatedRole.isSystemRole - } - - except HTTPException: - raise - except ValueError as e: - raise HTTPException( - status_code=400, - detail=str(e) - ) - except Exception as e: - logger.error(f"Error updating role: {str(e)}") - raise HTTPException( - status_code=500, - detail=f"Failed to update role: {str(e)}" - ) - - -@router.delete("/{roleId}", response_model=Dict[str, str]) -@limiter.limit("30/minute") -def delete_role( - request: Request, - roleId: str = Path(..., description="Role ID"), - context: RequestContext = Depends(getRequestContext) -) -> Dict[str, str]: - """ - Delete a role. - Context-aware: SysAdmin can delete any role. MandateAdmin can delete roles - within own mandates only. Template roles and sysadmin role are blocked. - - Path Parameters: - - roleId: Role ID - - Returns: - - Success message - """ - isSysAdmin = context.hasSysAdminRole - adminMandateIds = [] if isSysAdmin else _getAdminMandateIds(context) - if not isSysAdmin and not adminMandateIds: - raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin role required") - currentUser = context.user # backward compat for existing code - - try: - interface = getRootInterface() - - # MandateAdmin restrictions: check existing role before deleting - if not isSysAdmin: - existingRole = interface.getRole(roleId) - if not existingRole: - raise HTTPException(status_code=404, detail=f"Role {roleId} not found") - if existingRole.roleLabel == "sysadmin": - raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Cannot delete sysadmin role") - if existingRole.mandateId is None: - raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Cannot delete template roles") - if str(existingRole.mandateId) not in adminMandateIds: - raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Access denied to this role") - - success = interface.deleteRole(roleId) - if not success: - raise HTTPException( - status_code=404, - detail=f"Role {roleId} not found" - ) - - return {"message": f"Role {roleId} deleted successfully"} - - except HTTPException: - raise - except ValueError as e: - raise HTTPException( - status_code=400, - detail=str(e) - ) - except Exception as e: - logger.error(f"Error deleting role: {str(e)}") - raise HTTPException( - status_code=500, - detail=f"Failed to delete role: {str(e)}" - ) - - -@router.get("/users", response_model=List[Dict[str, Any]]) -@limiter.limit("60/minute") -def list_users_with_roles( - request: Request, - roleLabel: Optional[str] = Query(None, description="Filter by role label"), - mandateId: Optional[str] = Query(None, description="Filter by mandate ID (via UserMandate)"), - context: RequestContext = Depends(getRequestContext) -) -> List[Dict[str, Any]]: - """ - Get list of users with their role assignments. - Context-aware: SysAdmin sees all users. MandateAdmin sees users from own mandates only. - - Query Parameters: - - roleLabel: Optional filter by role label - - mandateId: Optional filter by mandate ID (via UserMandate table) - - Returns: - - List of user dictionaries with role assignments - """ - isSysAdmin = context.hasSysAdminRole - adminMandateIds = [] if isSysAdmin else _getAdminMandateIds(context) - if not isSysAdmin and not adminMandateIds: - raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin role required") - currentUser = context.user # backward compat for existing code - - try: - interface = getRootInterface() - - # Get all users via interface method (Pydantic models) - users = interface.getAllUsers() - - # Filter by mandate if specified (via UserMandate table) - if mandateId: - # MandateAdmin can only query mandates they admin - if not isSysAdmin and mandateId not in adminMandateIds: - raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Access denied to this mandate") - userMandates = interface.getUserMandatesByMandate(mandateId) - mandateUserIds = {str(um.userId) for um in userMandates} - users = [u for u in users if str(u.id) in mandateUserIds] - elif not isSysAdmin: - # MandateAdmin without mandateId filter: restrict to users in admin's mandates - allowedUserIds: Set[str] = set() - for mId in adminMandateIds: - userMandates = interface.getUserMandatesByMandate(mId) - for um in userMandates: - allowedUserIds.add(str(um.userId)) - users = [u for u in users if str(u.id) in allowedUserIds] - - # Filter by role if specified (via UserMandateRole) - if roleLabel: - users = [u for u in users if _hasRoleLabel(interface, str(u.id), roleLabel)] - - # Format response - result = [] - for user in users: - userRoleLabels = _getUserRoleLabels(interface, str(user.id)) - result.append({ - "id": user.id, - "username": user.username, - "email": user.email, - "fullName": user.fullName, - "isSysAdmin": user.isSysAdmin, - "enabled": user.enabled, - "roleLabels": userRoleLabels, - "roleCount": len(userRoleLabels) - }) - - return result - - except HTTPException: - raise - except Exception as e: - logger.error(f"Error listing users with roles: {str(e)}") - raise HTTPException( - status_code=500, - detail=f"Failed to list users with roles: {str(e)}" - ) - - -@router.get("/users/{userId}", response_model=Dict[str, Any]) -@limiter.limit("60/minute") -def get_user_roles( - request: Request, - userId: str = Path(..., description="User ID"), - context: RequestContext = Depends(getRequestContext) -) -> Dict[str, Any]: - """ - Get role assignments for a specific user. - Context-aware: SysAdmin sees all. MandateAdmin can view users in own mandates only. - - Path Parameters: - - userId: User ID - - Returns: - - User dictionary with role assignments - """ - isSysAdmin = context.hasSysAdminRole - adminMandateIds = [] if isSysAdmin else _getAdminMandateIds(context) - if not isSysAdmin and not adminMandateIds: - raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin role required") - currentUser = context.user # backward compat for existing code - - try: - interface = getRootInterface() - - # Get user - user = interface.getUser(userId) - if not user: - raise HTTPException( - status_code=404, - detail=f"User {userId} not found" - ) - - # MandateAdmin: check user is in one of admin's mandates - if not isSysAdmin: - userMandates = interface.getUserMandates(userId) - userMandateMandateIds = {str(um.mandateId) for um in userMandates} - if not userMandateMandateIds.intersection(adminMandateIds): - raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Access denied to this user") - - userRoleLabels = _getUserRoleLabels(interface, str(user.id)) - return { - "id": user.id, - "username": user.username, - "email": user.email, - "fullName": user.fullName, - "isSysAdmin": user.isSysAdmin, - "enabled": user.enabled, - "roleLabels": userRoleLabels, - "roleCount": len(userRoleLabels) - } - - except HTTPException: - raise - except Exception as e: - logger.error(f"Error getting user roles: {str(e)}") - raise HTTPException( - status_code=500, - detail=f"Failed to get user roles: {str(e)}" - ) - - -@router.put("/users/{userId}/roles", response_model=Dict[str, Any]) -@limiter.limit("30/minute") -def update_user_roles( - request: Request, - userId: str = Path(..., description="User ID"), - newRoleLabels: List[str] = Body(..., description="List of role labels to assign"), - context: RequestContext = Depends(getRequestContext) -) -> Dict[str, Any]: - """ - Update role assignments for a specific user. - Context-aware: SysAdmin can update any user's roles. MandateAdmin can update roles - for users in own mandates only. Cannot assign sysadmin role. - - Path Parameters: - - userId: User ID - - Request Body: - - newRoleLabels: List of role labels to assign (e.g., ["admin", "user"]) - - Returns: - - Updated user dictionary with role assignments - """ - isSysAdmin = context.hasSysAdminRole - adminMandateIds = [] if isSysAdmin else _getAdminMandateIds(context) - if not isSysAdmin and not adminMandateIds: - raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin role required") - currentUser = context.user # backward compat for existing code - - try: - interface = getRootInterface() - - # Get user - user = interface.getUser(userId) - if not user: - raise HTTPException( - status_code=404, - detail=f"User {userId} not found" - ) - - # MandateAdmin restrictions - if not isSysAdmin: - if "sysadmin" in newRoleLabels: - raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Cannot assign sysadmin role") - - # Validate role labels (basic validation - check against standard roles) - standardRoles = ["sysadmin", "admin", "user", "viewer"] - for roleLabel in newRoleLabels: - if roleLabel not in standardRoles: - logger.warning(f"Non-standard role label assigned: {roleLabel}") - - # Get user's first mandate (for role assignment) - userMandates = interface.getUserMandates(userId) - if not userMandates: - raise HTTPException( - status_code=400, - detail=f"User {userId} has no mandate memberships. Add to mandate first." - ) - - userMandateId = str(userMandates[0].id) - targetMandateId = str(userMandates[0].mandateId) - - # MandateAdmin: check target mandate belongs to admin's mandates - if not isSysAdmin: - if targetMandateId not in adminMandateIds: - raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Access denied to this mandate") - - # Get current roles for this mandate (Pydantic models) - existingRoles = interface.getUserMandateRoles(userMandateId) - existingRoleIds = {str(r.roleId) for r in existingRoles} - - # Convert roleLabels to roleIds - use mandate-scoped lookup to get instance roles - # (prevents assigning template roles instead of mandate-instance roles) - newRoleIds = set() - for roleLabel in newRoleLabels: - role = interface.getRoleByLabelAndScope(roleLabel, mandateId=targetMandateId) - if not role: - logger.warning(f"Role '{roleLabel}' not found for mandate {targetMandateId}, skipping") - continue - newRoleIds.add(str(role.id)) - - # Remove roles that are no longer needed - for existingRole in existingRoles: - if str(existingRole.roleId) not in newRoleIds: - interface.removeRoleFromUserMandate(userMandateId, str(existingRole.roleId)) - - # Add new roles - for roleId in newRoleIds: - if roleId not in existingRoleIds: - newRole = UserMandateRole(userMandateId=userMandateId, roleId=roleId) - interface.db.recordCreate(UserMandateRole, newRole.model_dump()) - - logger.info(f"Updated roles for user {userId}: {newRoleLabels} by admin {currentUser.id}") - - userRoleLabels = _getUserRoleLabels(interface, userId) - return { - "id": user.id, - "username": user.username, - "email": user.email, - "fullName": user.fullName, - "isSysAdmin": user.isSysAdmin, - "enabled": user.enabled, - "roleLabels": userRoleLabels, - "roleCount": len(userRoleLabels) - } - - except HTTPException: - raise - except Exception as e: - logger.error(f"Error updating user roles: {str(e)}") - raise HTTPException( - status_code=500, - detail=f"Failed to update user roles: {str(e)}" - ) - - -@router.post("/users/{userId}/roles/{roleLabel}", response_model=Dict[str, Any]) -@limiter.limit("30/minute") -def add_user_role( - request: Request, - userId: str = Path(..., description="User ID"), - roleLabel: str = Path(..., description="Role label to add"), - context: RequestContext = Depends(getRequestContext) -) -> Dict[str, Any]: - """ - Add a role to a user (if not already assigned). - Context-aware: SysAdmin can add any role. MandateAdmin can add roles to users - in own mandates only. Cannot assign sysadmin role. - - Path Parameters: - - userId: User ID - - roleLabel: Role label to add - - Returns: - - Updated user dictionary with role assignments - """ - isSysAdmin = context.hasSysAdminRole - adminMandateIds = [] if isSysAdmin else _getAdminMandateIds(context) - if not isSysAdmin and not adminMandateIds: - raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin role required") - currentUser = context.user # backward compat for existing code - - try: - interface = getRootInterface() - - # Get user - user = interface.getUser(userId) - if not user: - raise HTTPException( - status_code=404, - detail=f"User {userId} not found" - ) - - # MandateAdmin restrictions - if not isSysAdmin: - if roleLabel == "sysadmin": - raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Cannot assign sysadmin role") - - # Get user's first mandate - userMandates = interface.getUserMandates(userId) - if not userMandates: - raise HTTPException( - status_code=400, - detail=f"User {userId} has no mandate memberships. Add to mandate first." - ) - - userMandateId = str(userMandates[0].id) - targetMandateId = str(userMandates[0].mandateId) - - # MandateAdmin: check target mandate belongs to admin's mandates - if not isSysAdmin: - if targetMandateId not in adminMandateIds: - raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Access denied to this mandate") - - # Get role by label - use mandate-scoped lookup to get instance role - # (prevents assigning template roles instead of mandate-instance roles) - role = interface.getRoleByLabelAndScope(roleLabel, mandateId=targetMandateId) - if not role: - raise HTTPException( - status_code=404, - detail=f"Role '{roleLabel}' not found for mandate {targetMandateId}" - ) - - # Check if role is already assigned - use interface method - existingRoles = interface.getUserMandateRoles(userMandateId) - roleAlreadyAssigned = any(str(r.roleId) == str(role.id) for r in existingRoles) - - if not roleAlreadyAssigned: - # Add the role via interface method - interface.addRoleToUserMandate(userMandateId, str(role.id)) - logger.info(f"Added role {roleLabel} to user {userId} by admin {currentUser.id}") - - userRoleLabels = _getUserRoleLabels(interface, userId) - return { - "id": user.id, - "username": user.username, - "email": user.email, - "fullName": user.fullName, - "isSysAdmin": user.isSysAdmin, - "enabled": user.enabled, - "roleLabels": userRoleLabels, - "roleCount": len(userRoleLabels) - } - - except HTTPException: - raise - except Exception as e: - logger.error(f"Error adding role to user: {str(e)}") - raise HTTPException( - status_code=500, - detail=f"Failed to add role to user: {str(e)}" - ) - - -@router.delete("/users/{userId}/roles/{roleLabel}", response_model=Dict[str, Any]) -@limiter.limit("30/minute") -def remove_user_role( - request: Request, - userId: str = Path(..., description="User ID"), - roleLabel: str = Path(..., description="Role label to remove"), - context: RequestContext = Depends(getRequestContext) -) -> Dict[str, Any]: - """ - Remove a role from a user. - Context-aware: SysAdmin can remove any role. MandateAdmin can remove roles from - users in own mandates only. Cannot remove sysadmin role. - - Path Parameters: - - userId: User ID - - roleLabel: Role label to remove - - Returns: - - Updated user dictionary with role assignments - """ - isSysAdmin = context.hasSysAdminRole - adminMandateIds = [] if isSysAdmin else _getAdminMandateIds(context) - if not isSysAdmin and not adminMandateIds: - raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin role required") - currentUser = context.user # backward compat for existing code - - try: - interface = getRootInterface() - - # Get user - user = interface.getUser(userId) - if not user: - raise HTTPException( - status_code=404, - detail=f"User {userId} not found" - ) - - # MandateAdmin restrictions - if not isSysAdmin: - if roleLabel == "sysadmin": - raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Cannot remove sysadmin role") - - # Get role by label - role = interface.getRoleByLabel(roleLabel) - if not role: - raise HTTPException( - status_code=404, - detail=f"Role '{roleLabel}' not found" - ) - - # Remove role from user's mandates - userMandates = interface.getUserMandates(userId) - - # MandateAdmin: check user's mandates overlap with admin's mandates - if not isSysAdmin: - userMandateMandateIds = {str(um.mandateId) for um in userMandates} - if not userMandateMandateIds.intersection(set(adminMandateIds)): - raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Access denied to this user") - - roleRemoved = False - - for um in userMandates: - userMandateId = str(um.id) - - # MandateAdmin: only remove from mandates they admin - if not isSysAdmin and str(um.mandateId) not in adminMandateIds: - continue - - # Remove role via interface method - if interface.removeRoleFromUserMandate(userMandateId, str(role.id)): - roleRemoved = True - - if roleRemoved: - logger.info(f"Removed role {roleLabel} from user {userId} by admin {currentUser.id}") - - userRoleLabels = _getUserRoleLabels(interface, userId) - return { - "id": user.id, - "username": user.username, - "email": user.email, - "fullName": user.fullName, - "isSysAdmin": user.isSysAdmin, - "enabled": user.enabled, - "roleLabels": userRoleLabels, - "roleCount": len(userRoleLabels) - } - - except HTTPException: - raise - except Exception as e: - logger.error(f"Error removing role from user: {str(e)}") - raise HTTPException( - status_code=500, - detail=f"Failed to remove role from user: {str(e)}" - ) - - -@router.get("/roles/{roleLabel}/users", response_model=List[Dict[str, Any]]) -@limiter.limit("60/minute") -def get_users_with_role( - request: Request, - roleLabel: str = Path(..., description="Role label"), - mandateId: Optional[str] = Query(None, description="Filter by mandate ID (via UserMandate)"), - context: RequestContext = Depends(getRequestContext) -) -> List[Dict[str, Any]]: - """ - Get all users with a specific role. - Context-aware: SysAdmin sees all. MandateAdmin sees users from own mandates only. - - Path Parameters: - - roleLabel: Role label - - Query Parameters: - - mandateId: Optional filter by mandate ID (via UserMandate table) - - Returns: - - List of users with the specified role - """ - isSysAdmin = context.hasSysAdminRole - adminMandateIds = [] if isSysAdmin else _getAdminMandateIds(context) - if not isSysAdmin and not adminMandateIds: - raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin role required") - currentUser = context.user # backward compat for existing code - - try: - interface = getRootInterface() - - # Get role by label - role = interface.getRoleByLabel(roleLabel) - if not role: - raise HTTPException( - status_code=404, - detail=f"Role '{roleLabel}' not found" - ) - - # Get all UserMandateRole assignments for this role (Pydantic models) - roleAssignments = interface.getUserMandateRolesByRole(str(role.id)) - - # Get unique userMandateIds - userMandateIds = {str(ra.userMandateId) for ra in roleAssignments} - - # Get userIds from UserMandate records - userIds: Set[str] = set() - for userMandateId in userMandateIds: - um = interface.getUserMandateById(userMandateId) - if um: - # Filter by mandate if specified - if mandateId and str(um.mandateId) != mandateId: - continue - # MandateAdmin: filter to own mandates - if not isSysAdmin and str(um.mandateId) not in adminMandateIds: - continue - userIds.add(str(um.userId)) - - # Get users and format response - result = [] - for userId in userIds: - user = interface.getUser(userId) - if user: - userRoleLabels = _getUserRoleLabels(interface, userId) - result.append({ - "id": user.id, - "username": user.username, - "email": user.email, - "fullName": user.fullName, - "isSysAdmin": user.isSysAdmin, - "enabled": user.enabled, - "roleLabels": userRoleLabels, - "roleCount": len(userRoleLabels) - }) - - return result - - except HTTPException: - raise - except Exception as e: - logger.error(f"Error getting users with role: {str(e)}") - raise HTTPException( - status_code=500, - detail=f"Failed to get users with role: {str(e)}" - ) diff --git a/modules/routes/routeAdminRbacRules.py b/modules/routes/routeAdminRbacRules.py index 50b12f47..a7d4637e 100644 --- a/modules/routes/routeAdminRbacRules.py +++ b/modules/routes/routeAdminRbacRules.py @@ -873,9 +873,14 @@ def list_roles( if role.featureInstanceId is not None: continue - # Include global roles (mandateId=None) OR mandate-specific roles if mandateId matches - if role.mandateId is not None and (mandateId is None or role.mandateId != mandateId): - continue + # When mandateId requested: only mandate-scoped roles + # When no mandateId: only global roles (mandateId=None) + if mandateId: + if role.mandateId != mandateId: + continue + else: + if role.mandateId is not None: + continue # Filter: Exclude feature template roles unless includeTemplates=True if not includeTemplates and role.featureCode is not None: @@ -978,47 +983,6 @@ def list_roles( ) -@router.get("/roles/options", response_model=List[Dict[str, Any]]) -@limiter.limit("60/minute") -def get_role_options( - request: Request, - currentUser: User = Depends(requireSysAdminRole) -) -> List[Dict[str, Any]]: - """ - Get role options for select dropdowns. - MULTI-TENANT: SysAdmin-only. - - Returns: - - List of role option dictionaries with value and label - """ - try: - interface = getRootInterface() - - # Get all roles from database - dbRoles = interface.getAllRoles() - - # Convert to options format - options = [] - for role in dbRoles: - # Use English description as label, fallback to roleLabel - label = role.description.get("en", role.roleLabel) if isinstance(role.description, dict) else role.roleLabel - options.append({ - "value": role.roleLabel, - "label": label - }) - - return options - - except HTTPException: - raise - except Exception as e: - logger.error(f"Error getting role options: {str(e)}") - raise HTTPException( - status_code=500, - detail=f"Failed to get role options: {str(e)}" - ) - - @router.post("/roles", response_model=Dict[str, Any]) @limiter.limit("30/minute") def create_role( diff --git a/modules/routes/routeDataMandates.py b/modules/routes/routeDataMandates.py index b7feacbc..ff4bab3e 100644 --- a/modules/routes/routeDataMandates.py +++ b/modules/routes/routeDataMandates.py @@ -540,21 +540,13 @@ def add_user_to_mandate( """ Add a user to a mandate with specified roles. - Requires Mandate-Admin role. - SysAdmin cannot add themselves (Self-Eskalation Prevention). + Requires Mandate-Admin role (SysAdmin passes automatically). Args: targetMandateId: Target mandate ID data: User ID and role IDs to assign """ - # 1. SysAdmin Self-Eskalation Prevention - if context.hasSysAdminRole and data.targetUserId == str(context.user.id): - raise HTTPException( - status_code=status.HTTP_403_FORBIDDEN, - detail="SysAdmin cannot add themselves to a mandate. A Mandate-Admin must grant access." - ) - - # 2. Check Mandate-Admin permission + # Check Mandate-Admin permission if not _hasMandateAdminRole(context, targetMandateId): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, diff --git a/modules/routes/routeStore.py b/modules/routes/routeStore.py new file mode 100644 index 00000000..5bb18103 --- /dev/null +++ b/modules/routes/routeStore.py @@ -0,0 +1,344 @@ +# Copyright (c) 2025 Patrick Motsch +# All rights reserved. +""" +Feature Store routes. +Allows users to self-activate features in the root mandate's shared instances. + +Architecture: Shared Instance Pattern +- Each store feature has exactly 1 instance in the root mandate (created at bootstrap) +- Users activate by getting FeatureAccess + user-role on the shared instance +- Data isolation is guaranteed by read="m" (WHERE _createdBy = userId) +""" + +from fastapi import APIRouter, HTTPException, Depends, Request +from typing import List, Dict, Any +from fastapi import status +import logging +from pydantic import BaseModel, Field + +from modules.auth import limiter, getRequestContext, RequestContext +from modules.datamodels.datamodelFeatures import FeatureInstance +from modules.datamodels.datamodelMembership import FeatureAccess, FeatureAccessRole +from modules.datamodels.datamodelRbac import AccessRuleContext +from modules.datamodels.datamodelUam import Mandate +from modules.interfaces.interfaceDbApp import getRootInterface +from modules.interfaces.interfaceFeatures import getFeatureInterface +from modules.security.rbacCatalog import getCatalogService +from modules.security.rbac import RbacClass +from modules.security.rootAccess import getRootDbAppConnector + +logger = logging.getLogger(__name__) + +router = APIRouter( + prefix="/api/store", + tags=["Store"], + responses={404: {"description": "Not found"}} +) + + +class StoreActivateRequest(BaseModel): + """Request model for activating a store feature.""" + featureCode: str = Field(..., description="Feature code to activate (e.g., 'automation')") + + +class StoreFeatureResponse(BaseModel): + """Response model for a store feature.""" + featureCode: str + label: Dict[str, str] + icon: str + description: Dict[str, str] = {} + isActive: bool + canActivate: bool + instanceId: str | None = None + + +def _getRootMandateId(db) -> str | None: + """Find the root mandate ID.""" + mandates = db.getRecordset(Mandate, recordFilter={"name": "root", "isSystem": True}) + if mandates: + return mandates[0].get("id") + return None + + +def _getStoreFeatures(catalogService) -> List[Dict[str, Any]]: + """Get all features that are available in the store (have resource.store.* entries).""" + resourceObjects = catalogService.getResourceObjects() + storeFeatures = [] + for obj in resourceObjects: + meta = obj.get("meta", {}) + if meta.get("category") == "store": + featureCode = meta.get("featureCode") + if featureCode: + featureDef = catalogService.getFeatureDefinition(featureCode) + if featureDef: + storeFeatures.append(featureDef) + return storeFeatures + + +def _checkStorePermission(context: RequestContext, featureCode: str) -> bool: + """Check if user has RBAC permission to activate a store feature.""" + if context.hasSysAdminRole: + return True + + resourceItem = f"resource.store.{featureCode}" + dbApp = getRootDbAppConnector() + rbacInstance = RbacClass(dbApp, dbApp=dbApp) + permissions = rbacInstance.getUserPermissions( + context.user, + AccessRuleContext.RESOURCE, + resourceItem, + mandateId=str(context.mandateId) if context.mandateId else None, + ) + return permissions.view + + +def _findSharedInstance(db, rootMandateId: str, featureCode: str) -> Dict[str, Any] | None: + """Find the shared instance for a feature in the root mandate.""" + instances = db.getRecordset( + FeatureInstance, + recordFilter={"mandateId": rootMandateId, "featureCode": featureCode} + ) + return instances[0] if instances else None + + +def _getUserFeatureAccess(db, userId: str, instanceId: str) -> Dict[str, Any] | None: + """Check if user already has FeatureAccess for an instance.""" + accesses = db.getRecordset( + FeatureAccess, + recordFilter={"userId": userId, "featureInstanceId": instanceId} + ) + return accesses[0] if accesses else None + + +def _findUserRole(rootInterface, instanceId: str, featureCode: str) -> str | None: + """Find the user-level role for a feature instance.""" + instanceRoles = rootInterface.getRolesByFeatureInstance(instanceId) + userRoleLabel = f"{featureCode}-user" + for role in instanceRoles: + if role.roleLabel == userRoleLabel: + return str(role.id) + for role in instanceRoles: + if "user" in role.roleLabel.lower() and "admin" not in role.roleLabel.lower(): + return str(role.id) + return None + + +@router.get("/features", response_model=List[StoreFeatureResponse]) +@limiter.limit("60/minute") +def listStoreFeatures( + request: Request, + context: RequestContext = Depends(getRequestContext) +) -> List[StoreFeatureResponse]: + """ + List all store features with activation status and permissions. + + Returns the store catalog showing which features are available, + which are already activated, and whether the user can activate them. + """ + try: + rootInterface = getRootInterface() + db = rootInterface.db + catalogService = getCatalogService() + + rootMandateId = _getRootMandateId(db) + if not rootMandateId: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="Root mandate not found" + ) + + storeFeatures = _getStoreFeatures(catalogService) + userId = str(context.user.id) + result = [] + + for featureDef in storeFeatures: + featureCode = featureDef["code"] + sharedInstance = _findSharedInstance(db, rootMandateId, featureCode) + instanceId = sharedInstance.get("id") if sharedInstance else None + + isActive = False + if instanceId: + existingAccess = _getUserFeatureAccess(db, userId, instanceId) + isActive = existingAccess is not None + + canActivate = _checkStorePermission(context, featureCode) and not isActive + + result.append(StoreFeatureResponse( + featureCode=featureCode, + label=featureDef.get("label", {}), + icon=featureDef.get("icon", "mdi-puzzle"), + isActive=isActive, + canActivate=canActivate, + instanceId=instanceId, + )) + + return result + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error listing store features: {e}") + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Failed to list store features: {str(e)}" + ) + + +@router.post("/activate", response_model=Dict[str, Any]) +@limiter.limit("10/minute") +def activateStoreFeature( + request: Request, + data: StoreActivateRequest, + context: RequestContext = Depends(getRequestContext) +) -> Dict[str, Any]: + """ + Activate a store feature for the current user. + + Creates FeatureAccess + FeatureAccessRole on the shared instance + in the root mandate. The user gets the feature's user-level role. + """ + featureCode = data.featureCode + userId = str(context.user.id) + + try: + rootInterface = getRootInterface() + db = rootInterface.db + + if not _checkStorePermission(context, featureCode): + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail=f"No permission to activate feature '{featureCode}'" + ) + + catalogService = getCatalogService() + featureDef = catalogService.getFeatureDefinition(featureCode) + if not featureDef: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Feature '{featureCode}' not found" + ) + + rootMandateId = _getRootMandateId(db) + if not rootMandateId: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="Root mandate not found" + ) + + sharedInstance = _findSharedInstance(db, rootMandateId, featureCode) + if not sharedInstance: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Shared instance for '{featureCode}' not found in root mandate" + ) + + instanceId = sharedInstance.get("id") + + existingAccess = _getUserFeatureAccess(db, userId, instanceId) + if existingAccess: + raise HTTPException( + status_code=status.HTTP_409_CONFLICT, + detail=f"Feature '{featureCode}' is already active" + ) + + featureAccess = FeatureAccess( + userId=userId, + featureInstanceId=instanceId, + enabled=True + ) + createdAccess = db.recordCreate(FeatureAccess, featureAccess.model_dump()) + featureAccessId = createdAccess.get("id") + + userRoleId = _findUserRole(rootInterface, instanceId, featureCode) + if userRoleId: + featureAccessRole = FeatureAccessRole( + featureAccessId=featureAccessId, + roleId=userRoleId + ) + db.recordCreate(FeatureAccessRole, featureAccessRole.model_dump()) + + logger.info( + f"User {userId} activated store feature '{featureCode}' " + f"(instance={instanceId}, role={userRoleId})" + ) + + return { + "featureCode": featureCode, + "instanceId": instanceId, + "featureAccessId": featureAccessId, + "roleId": userRoleId, + "activated": True, + } + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error activating store feature '{featureCode}': {e}") + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Failed to activate feature: {str(e)}" + ) + + +@router.post("/deactivate", response_model=Dict[str, Any]) +@limiter.limit("10/minute") +def deactivateStoreFeature( + request: Request, + data: StoreActivateRequest, + context: RequestContext = Depends(getRequestContext) +) -> Dict[str, Any]: + """ + Deactivate a store feature for the current user. + + Removes FeatureAccess (CASCADE deletes FeatureAccessRole). + User loses access immediately. + """ + featureCode = data.featureCode + userId = str(context.user.id) + + try: + rootInterface = getRootInterface() + db = rootInterface.db + + rootMandateId = _getRootMandateId(db) + if not rootMandateId: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="Root mandate not found" + ) + + sharedInstance = _findSharedInstance(db, rootMandateId, featureCode) + if not sharedInstance: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Shared instance for '{featureCode}' not found" + ) + + instanceId = sharedInstance.get("id") + + existingAccess = _getUserFeatureAccess(db, userId, instanceId) + if not existingAccess: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Feature '{featureCode}' is not active" + ) + + featureAccessId = existingAccess.get("id") + db.recordDelete(FeatureAccess, featureAccessId) + + logger.info(f"User {userId} deactivated store feature '{featureCode}' (instance={instanceId})") + + return { + "featureCode": featureCode, + "instanceId": instanceId, + "deactivated": True, + } + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error deactivating store feature '{featureCode}': {e}") + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Failed to deactivate feature: {str(e)}" + ) diff --git a/modules/system/mainSystem.py b/modules/system/mainSystem.py index 75a92401..feb997e8 100644 --- a/modules/system/mainSystem.py +++ b/modules/system/mainSystem.py @@ -49,6 +49,15 @@ NAVIGATION_SECTIONS = [ "order": 10, "public": True, }, + { + "id": "store", + "objectKey": "ui.system.store", + "label": {"en": "Store", "de": "Store", "fr": "Store"}, + "icon": "FaStore", + "path": "/store", + "order": 15, + "public": True, + }, { "id": "settings", "objectKey": "ui.system.settings", @@ -423,6 +432,21 @@ DATA_OBJECTS = [ # ============================================================================= RESOURCE_OBJECTS = [ + { + "objectKey": "resource.store.automation", + "label": {"en": "Store: Automation", "de": "Store: Automation", "fr": "Store: Automatisation"}, + "meta": {"category": "store", "featureCode": "automation"} + }, + { + "objectKey": "resource.store.chatplayground", + "label": {"en": "Store: Chat Playground", "de": "Store: Chat Playground", "fr": "Store: Chat Playground"}, + "meta": {"category": "store", "featureCode": "chatplayground"} + }, + { + "objectKey": "resource.store.teamsbot", + "label": {"en": "Store: Teams Bot", "de": "Store: Teams Bot", "fr": "Store: Teams Bot"}, + "meta": {"category": "store", "featureCode": "teamsbot"} + }, { "objectKey": "resource.system.api.auth", "label": {"en": "Authentication API", "de": "Authentifizierungs-API", "fr": "API d'authentification"},