fix: RBAC bootstrap anthropic for user, FeatureAccess response, workspace UI repair, user access overview, RBAC tests

Made-with: Cursor
This commit is contained in:
ValueOn AG 2026-03-23 10:29:23 +01:00
parent 2345ff669a
commit f796ae3807
6 changed files with 360 additions and 331 deletions

View file

@ -226,6 +226,8 @@ def _syncTemplateRolesToDb() -> int:
if createdCount > 0: if createdCount > 0:
logger.info(f"Feature '{FEATURE_CODE}': Created {createdCount} template roles") logger.info(f"Feature '{FEATURE_CODE}': Created {createdCount} template roles")
_repairWorkspaceUserInstanceUiNav(rootInterface)
return createdCount return createdCount
except Exception as e: except Exception as e:
@ -233,6 +235,57 @@ def _syncTemplateRolesToDb() -> int:
return 0 return 0
def _repairWorkspaceUserInstanceUiNav(rootInterface) -> int:
"""
Ensure every instance-scoped workspace-user role grants UI view on Editor and Settings.
Covers older instance roles copied before template updates (bootstrap / app startup).
"""
from modules.datamodels.datamodelRbac import AccessRule, AccessRuleContext, Role
workspaceNavObjectKeys = (
"ui.feature.workspace.editor",
"ui.feature.workspace.settings",
)
repairCount = 0
try:
userRoleRecords = rootInterface.db.getRecordset(
Role,
recordFilter={"featureCode": FEATURE_CODE, "roleLabel": "workspace-user"},
)
for roleRec in userRoleRecords or []:
if not roleRec.get("featureInstanceId"):
continue
roleId = str(roleRec.get("id"))
accessRules = rootInterface.getAccessRulesByRole(roleId)
rulesByUiKey = {(r.context, r.item): r for r in accessRules}
for objectKey in workspaceNavObjectKeys:
uiKey = (AccessRuleContext.UI, objectKey)
existingRule = rulesByUiKey.get(uiKey)
if existingRule is None:
newRule = AccessRule(
roleId=roleId,
context=AccessRuleContext.UI,
item=objectKey,
view=True,
read=None,
create=None,
update=None,
delete=None,
)
rootInterface.db.recordCreate(AccessRule, newRule.model_dump())
repairCount += 1
elif not existingRule.view:
rootInterface.db.recordModify(AccessRule, str(existingRule.id), {"view": True})
repairCount += 1
if repairCount:
logger.info(
f"Feature '{FEATURE_CODE}': Repaired {repairCount} UI AccessRules for instance workspace-user roles (Editor/Settings)"
)
except Exception as e:
logger.warning(f"Feature '{FEATURE_CODE}': workspace-user UI nav repair skipped: {e}")
return repairCount
def _ensureAccessRulesForRole(rootInterface, roleId: str, ruleTemplates: List[Dict[str, Any]]) -> int: def _ensureAccessRulesForRole(rootInterface, roleId: str, ruleTemplates: List[Dict[str, Any]]) -> int:
"""Ensure AccessRules exist for a role based on templates.""" """Ensure AccessRules exist for a role based on templates."""
from modules.datamodels.datamodelRbac import AccessRule, AccessRuleContext from modules.datamodels.datamodelRbac import AccessRule, AccessRuleContext

View file

@ -1871,7 +1871,7 @@ def _createAicoreProviderRules(db: DatabaseConnector) -> None:
Provider access per role: Provider access per role:
- admin: all providers allowed - admin: all providers allowed
- user: all providers EXCEPT anthropic (view=False) - user: all providers allowed
- viewer: NO provider access (viewer has no RESOURCE permissions) - viewer: NO provider access (viewer has no RESOURCE permissions)
NOTE: Provider list is dynamically discovered from AICore model registry. NOTE: Provider list is dynamically discovered from AICore model registry.
@ -1916,7 +1916,7 @@ def _createAicoreProviderRules(db: DatabaseConnector) -> None:
read=None, create=None, update=None, delete=None, read=None, create=None, update=None, delete=None,
)) ))
# User: access to all providers EXCEPT anthropic # User: access to all providers (same provider keys as admin)
userId = _getRoleId(db, "user") userId = _getRoleId(db, "user")
if userId: if userId:
for provider in providers: for provider in providers:
@ -1930,13 +1930,11 @@ def _createAicoreProviderRules(db: DatabaseConnector) -> None:
} }
) )
if not existingRules: if not existingRules:
# Anthropic is not allowed for user role
isAllowed = provider != "anthropic"
providerRules.append(AccessRule( providerRules.append(AccessRule(
roleId=userId, roleId=userId,
context=AccessRuleContext.RESOURCE, context=AccessRuleContext.RESOURCE,
item=resourceKey, item=resourceKey,
view=isAllowed, view=True,
read=None, create=None, update=None, delete=None, read=None, create=None, update=None, delete=None,
)) ))

View file

@ -1408,7 +1408,7 @@ def update_feature_instance_user_roles(
"userId": userId, "userId": userId,
"featureInstanceId": instanceId, "featureInstanceId": instanceId,
"roleIds": data.roleIds, "roleIds": data.roleIds,
"enabled": data.enabled if data.enabled is not None else existingAccess[0].get("enabled", True) "enabled": data.enabled if data.enabled is not None else bool(existingAccess.enabled),
} }
except HTTPException: except HTTPException:

View file

@ -278,7 +278,8 @@ def getUserAccessOverview(
# Get mandate name # Get mandate name
mandate = interface.getMandate(umMandateId) mandate = interface.getMandate(umMandateId)
mandateName = mandate.name if mandate else umMandateId mandateName = mandate.name if mandate else umMandateId
mandateLabel = (mandate.label or None) if mandate else None
# Get roles for this UserMandate using interface method # Get roles for this UserMandate using interface method
umRoles = interface.getUserMandateRoles(umId) umRoles = interface.getUserMandateRoles(umId)
@ -368,6 +369,7 @@ def getUserAccessOverview(
mandatesInfo.append({ mandatesInfo.append({
"id": umMandateId, "id": umMandateId,
"name": mandateName, "name": mandateName,
"label": mandateLabel,
"roleIds": mandateRoleIds, "roleIds": mandateRoleIds,
"featureInstances": featureInstancesInfo, "featureInstances": featureInstancesInfo,
}) })

View file

@ -5,165 +5,155 @@ Unit tests for RBAC bootstrap initialization.
Tests that bootstrap creates correct rules and initial data. Tests that bootstrap creates correct rules and initial data.
""" """
import pytest from unittest.mock import Mock, patch
from unittest.mock import Mock, MagicMock, patch
from modules.interfaces.interfaceBootstrap import ( from modules.interfaces.interfaceBootstrap import (
initBootstrap,
initRootMandate, initRootMandate,
initAdminUser, initAdminUser,
initEventUser, initEventUser,
initRbacRules, initRbacRules,
createDefaultRoleRules, _createDefaultRoleRules,
createTableSpecificRules _createTableSpecificRules,
) )
from modules.datamodels.datamodelUam import UserInDB, Mandate, AuthAuthority from modules.datamodels.datamodelUam import UserInDB, Mandate
from modules.datamodels.datamodelRbac import AccessRule, AccessRuleContext from modules.datamodels.datamodelRbac import AccessRule, AccessRuleContext
from modules.datamodels.datamodelUam import AccessLevel from modules.datamodels.datamodelUam import AccessLevel
class TestRbacBootstrap: class TestRbacBootstrap:
"""Test RBAC bootstrap initialization.""" """Test RBAC bootstrap initialization."""
def testInitRootMandateCreatesIfNotExists(self): def testInitRootMandateCreatesIfNotExists(self):
"""Test that initRootMandate creates mandate if it doesn't exist.""" """Test that initRootMandate creates mandate if it doesn't exist."""
db = Mock() db = Mock()
db.getRecordset = Mock(return_value=[]) # No existing mandates db.getRecordset = Mock(return_value=[])
db.recordCreate = Mock(return_value={"id": "mandate1", "name": "Root"}) db.recordCreate = Mock(return_value={"id": "mandate1", "name": "Root"})
mandateId = initRootMandate(db) mandateId = initRootMandate(db)
assert mandateId == "mandate1" assert mandateId == "mandate1"
db.recordCreate.assert_called_once() db.recordCreate.assert_called_once()
callArgs = db.recordCreate.call_args callArgs = db.recordCreate.call_args
assert isinstance(callArgs[0][1], Mandate) assert isinstance(callArgs[0][1], Mandate)
assert callArgs[0][1].name == "Root" assert callArgs[0][1].name == "root"
assert callArgs[0][1].label == "Root"
def testInitRootMandateReturnsExisting(self): def testInitRootMandateReturnsExisting(self):
"""Test that initRootMandate returns existing mandate ID.""" """Test that initRootMandate returns existing mandate ID."""
db = Mock() db = Mock()
db.getRecordset = Mock(return_value=[{"id": "existing_mandate"}]) db.getRecordset = Mock(return_value=[{"id": "existing_mandate"}])
mandateId = initRootMandate(db) mandateId = initRootMandate(db)
assert mandateId == "existing_mandate" assert mandateId == "existing_mandate"
db.recordCreate.assert_not_called() db.recordCreate.assert_not_called()
def testInitAdminUserCreatesWithSysadminRole(self): def testInitAdminUserCreatesWithSysadminRole(self):
"""Test that initAdminUser creates user with sysadmin role.""" """Test that initAdminUser creates user with isSysAdmin=True."""
db = Mock() db = Mock()
db.getRecordset = Mock(return_value=[]) # No existing users db.getRecordset = Mock(return_value=[])
db.recordCreate = Mock(return_value={"id": "admin1", "username": "admin"}) db.recordCreate = Mock(return_value={"id": "admin1", "username": "admin"})
with patch('modules.interfaces.interfaceBootstrap._getPasswordHash', return_value="hashed"): with patch("modules.interfaces.interfaceBootstrap._getPasswordHash", return_value="hashed"):
userId = initAdminUser(db, "mandate1") userId = initAdminUser(db, "mandate1")
assert userId == "admin1" assert userId == "admin1"
db.recordCreate.assert_called_once() db.recordCreate.assert_called_once()
callArgs = db.recordCreate.call_args callArgs = db.recordCreate.call_args
user = callArgs[0][1] user = callArgs[0][1]
assert isinstance(user, UserInDB) assert isinstance(user, UserInDB)
assert user.username == "admin" assert user.username == "admin"
assert "sysadmin" in user.roleLabels assert user.isSysAdmin is True
def testInitEventUserCreatesWithSysadminRole(self): def testInitEventUserCreatesWithSysadminRole(self):
"""Test that initEventUser creates user with sysadmin role.""" """Test that initEventUser creates user with isSysAdmin=True."""
db = Mock() db = Mock()
db.getRecordset = Mock(return_value=[]) # No existing users db.getRecordset = Mock(return_value=[])
db.recordCreate = Mock(return_value={"id": "event1", "username": "event"}) db.recordCreate = Mock(return_value={"id": "event1", "username": "event"})
with patch('modules.interfaces.interfaceBootstrap._getPasswordHash', return_value="hashed"): with patch("modules.interfaces.interfaceBootstrap._getPasswordHash", return_value="hashed"):
userId = initEventUser(db, "mandate1") userId = initEventUser(db, "mandate1")
assert userId == "event1" assert userId == "event1"
db.recordCreate.assert_called_once() db.recordCreate.assert_called_once()
callArgs = db.recordCreate.call_args callArgs = db.recordCreate.call_args
user = callArgs[0][1] user = callArgs[0][1]
assert isinstance(user, UserInDB) assert isinstance(user, UserInDB)
assert user.username == "event" assert user.username == "event"
assert "sysadmin" in user.roleLabels assert user.isSysAdmin is True
def testCreateDefaultRoleRules(self): def testCreateDefaultRoleRules(self):
"""Test that createDefaultRoleRules creates correct default rules.""" """Test that _createDefaultRoleRules creates admin + viewer generic DATA rules."""
db = Mock() db = Mock()
db.recordCreate = Mock() db.recordCreate = Mock()
createDefaultRoleRules(db) with patch(
"modules.interfaces.interfaceBootstrap._getRoleId",
# Should create 4 default rules (sysadmin, admin, user, viewer) side_effect=lambda d, label: f"rid-{label}",
assert db.recordCreate.call_count == 4 ):
_createDefaultRoleRules(db)
# Check sysadmin rule
sysadminCall = [call for call in db.recordCreate.call_args_list assert db.recordCreate.call_count == 2
if call[0][1].roleLabel == "sysadmin"][0] created = [c[0][1] for c in db.recordCreate.call_args_list]
sysadminRule = sysadminCall[0][1] byRoleId = {r.roleId: r for r in created}
assert sysadminRule.context == AccessRuleContext.DATA
assert sysadminRule.item is None adminRule = byRoleId["rid-admin"]
assert sysadminRule.view == True assert adminRule.context == AccessRuleContext.DATA
assert sysadminRule.read == AccessLevel.ALL assert adminRule.item is None
assert sysadminRule.create == AccessLevel.ALL assert adminRule.view is True
assert adminRule.read == AccessLevel.GROUP
# Check user rule assert adminRule.create == AccessLevel.GROUP
userCall = [call for call in db.recordCreate.call_args_list
if call[0][1].roleLabel == "user"][0] viewerRule = byRoleId["rid-viewer"]
userRule = userCall[0][1] assert viewerRule.read == AccessLevel.GROUP
assert userRule.read == AccessLevel.MY assert viewerRule.create == AccessLevel.NONE
assert userRule.create == AccessLevel.MY
def testCreateTableSpecificRules(self): def testCreateTableSpecificRules(self):
"""Test that createTableSpecificRules creates table-specific rules.""" """Test that _createTableSpecificRules creates table-specific rules."""
db = Mock() db = Mock()
db.recordCreate = Mock() db.recordCreate = Mock()
createTableSpecificRules(db) with patch(
"modules.interfaces.interfaceBootstrap._getRoleId",
# Should create multiple rules for different tables side_effect=lambda d, label: f"rid-{label}",
):
_createTableSpecificRules(db)
assert db.recordCreate.call_count > 0 assert db.recordCreate.call_count > 0
# Check that Mandate table rules are created with full objectKey (UAM namespace) mandateCalls = [
mandateCalls = [call for call in db.recordCreate.call_args_list call for call in db.recordCreate.call_args_list if call[0][1].item == "data.uam.Mandate"
if call[0][1].item == "data.uam.Mandate"] ]
assert len(mandateCalls) > 0 assert len(mandateCalls) > 0
# Check that all roles have view=False and no access for Mandate
# (SysAdmin bypasses RBAC via isSysAdmin flag, not via roles)
for call in mandateCalls: for call in mandateCalls:
rule = call[0][1] rule = call[0][1]
assert rule.view == False assert rule.view is False
assert rule.read == AccessLevel.NONE assert rule.read == AccessLevel.NONE
def testInitRbacRulesSkipsIfExists(self): def testInitRbacRulesSkipsIfExists(self):
"""Test that initRbacRules skips default rule creation if rules already exist, but adds missing table-specific rules.""" """When AccessRules already exist, init skips full init; ensure-* hooks are not exercised here."""
db = Mock() db = Mock()
# Mock existing rules - include rules for ChatWorkflow and AutomationDefinition to prevent adding missing rules db.getRecordset = Mock(return_value=[{"id": "existing_rule"}])
# Need rules for all required roles to fully prevent creation
# Using semantic namespace format: data.chat.{TableName}, data.automation.{TableName}
existingRules = []
for table in ["data.chat.ChatWorkflow", "data.automation.AutomationDefinition"]:
for role in ["admin", "user", "viewer"]:
existingRules.append({
"id": f"rule_{table}_{role}",
"item": table,
"context": AccessRuleContext.DATA.value,
"roleLabel": role
})
db.getRecordset = Mock(return_value=existingRules)
db.recordCreate = Mock() db.recordCreate = Mock()
initRbacRules(db) with patch("modules.interfaces.interfaceBootstrap._ensureUiContextRules"):
with patch("modules.interfaces.interfaceBootstrap._ensureDataContextRules"):
# Should not create new rules since all required tables already have rules for all roles initRbacRules(db)
db.recordCreate.assert_not_called() db.recordCreate.assert_not_called()
def testInitRbacRulesCreatesIfNotExists(self): def testInitRbacRulesCreatesIfNotExists(self):
"""Test that initRbacRules creates rules if they don't exist.""" """Test that initRbacRules creates rules when the AccessRule table is empty."""
db = Mock() db = Mock()
db.getRecordset = Mock(side_effect=[
[], # No existing rules
[] # After creating default rules
])
db.recordCreate = Mock() db.recordCreate = Mock()
db.recordModify = Mock()
initRbacRules(db) db.getRecordset = Mock(return_value=[])
# Should create rules with patch(
"modules.interfaces.interfaceBootstrap._getRoleId",
side_effect=lambda d, label: f"rid-{label}",
):
initRbacRules(db)
assert db.recordCreate.call_count > 0 assert db.recordCreate.call_count > 0

View file

@ -5,411 +5,397 @@ Unit tests for RBAC permission resolution.
Tests rule specificity, multiple roles, and permission combination logic. Tests rule specificity, multiple roles, and permission combination logic.
""" """
import pytest from unittest.mock import Mock
from modules.datamodels.datamodelUam import User, AccessLevel, UserPermissions
from modules.datamodels.datamodelUam import User, AccessLevel
from modules.datamodels.datamodelRbac import AccessRule, AccessRuleContext from modules.datamodels.datamodelRbac import AccessRule, AccessRuleContext
from modules.security.rbac import RbacClass from modules.security.rbac import RbacClass
from modules.connectors.connectorDbPostgre import DatabaseConnector from modules.connectors.connectorDbPostgre import DatabaseConnector
from unittest.mock import Mock, MagicMock
_TEST_ROLE_USER = "test-rid-user"
_TEST_ROLE_VIEWER = "test-rid-viewer"
def _patchRbacResolution(rbac, roleIds, rulesWithPriority):
"""Stub multi-tenant role loading and rule fetch so tests exercise merge logic only."""
rbac._getRoleIdsForUser = Mock(return_value=roleIds)
rbac._getRulesForRoleIds = Mock(return_value=rulesWithPriority)
class TestRbacPermissionResolution: class TestRbacPermissionResolution:
"""Test RBAC permission resolution logic.""" """Test RBAC permission resolution logic."""
def testSingleRoleGenericRule(self): def testSingleRoleGenericRule(self):
"""Test permission resolution with a single role and generic rule.""" """Test permission resolution with a single role and generic rule."""
# Mock database connector
db = Mock(spec=DatabaseConnector) db = Mock(spec=DatabaseConnector)
dbApp = Mock(spec=DatabaseConnector) dbApp = Mock(spec=DatabaseConnector)
# Create RBAC interface
rbac = RbacClass(db, dbApp=dbApp) rbac = RbacClass(db, dbApp=dbApp)
# Create user with single role
user = User( user = User(
id="user1", id="user1",
username="testuser", username="testuser",
roleLabels=["user"], roleLabels=["user"],
mandateId="mandate1" mandateId="mandate1",
) )
# Mock rules for "user" role rules = [
def mockGetRulesForRole(roleLabel, context): (
if roleLabel == "user" and context == AccessRuleContext.DATA: 1,
return [ AccessRule(
AccessRule( roleId=_TEST_ROLE_USER,
roleLabel="user", context=AccessRuleContext.DATA,
context=AccessRuleContext.DATA, item=None,
item=None, # Generic rule view=True,
view=True, read=AccessLevel.MY,
read=AccessLevel.MY, create=AccessLevel.MY,
create=AccessLevel.MY, update=AccessLevel.MY,
update=AccessLevel.MY, delete=AccessLevel.MY,
delete=AccessLevel.MY ),
) )
] ]
return [] _patchRbacResolution(rbac, [_TEST_ROLE_USER], rules)
rbac._getRulesForRole = mockGetRulesForRole
# Get permissions for generic table
permissions = rbac.getUserPermissions( permissions = rbac.getUserPermissions(
user, user,
AccessRuleContext.DATA, AccessRuleContext.DATA,
"SomeTable" "SomeTable",
) )
assert permissions.view == True assert permissions.view is True
assert permissions.read == AccessLevel.MY assert permissions.read == AccessLevel.MY
assert permissions.create == AccessLevel.MY assert permissions.create == AccessLevel.MY
assert permissions.update == AccessLevel.MY assert permissions.update == AccessLevel.MY
assert permissions.delete == AccessLevel.MY assert permissions.delete == AccessLevel.MY
def testRuleSpecificityMostSpecificWins(self): def testRuleSpecificityMostSpecificWins(self):
"""Test that most specific rule wins within a single role.""" """Test that most specific rule wins within a single role."""
db = Mock(spec=DatabaseConnector) db = Mock(spec=DatabaseConnector)
dbApp = Mock(spec=DatabaseConnector) dbApp = Mock(spec=DatabaseConnector)
rbac = RbacClass(db, dbApp=dbApp) rbac = RbacClass(db, dbApp=dbApp)
user = User( user = User(
id="user1", id="user1",
username="testuser", username="testuser",
roleLabels=["user"], roleLabels=["user"],
mandateId="mandate1" mandateId="mandate1",
) )
def mockGetRulesForRole(roleLabel, context): rules = [
if roleLabel == "user" and context == AccessRuleContext.DATA: (
return [ 1,
AccessRule( AccessRule(
roleLabel="user", roleId=_TEST_ROLE_USER,
context=AccessRuleContext.DATA, context=AccessRuleContext.DATA,
item=None, # Generic rule item=None,
view=True, view=True,
read=AccessLevel.GROUP, read=AccessLevel.GROUP,
create=AccessLevel.GROUP, create=AccessLevel.GROUP,
update=AccessLevel.GROUP, update=AccessLevel.GROUP,
delete=AccessLevel.GROUP delete=AccessLevel.GROUP,
), ),
AccessRule( ),
roleLabel="user", (
context=AccessRuleContext.DATA, 1,
item="data.uam.UserInDB", # Specific rule with UAM namespace AccessRule(
view=True, roleId=_TEST_ROLE_USER,
read=AccessLevel.MY, context=AccessRuleContext.DATA,
create=AccessLevel.NONE, item="data.uam.UserInDB",
update=AccessLevel.MY, view=True,
delete=AccessLevel.NONE read=AccessLevel.MY,
) create=AccessLevel.NONE,
] update=AccessLevel.MY,
return [] delete=AccessLevel.NONE,
),
rbac._getRulesForRole = mockGetRulesForRole ),
]
# Get permissions for UserInDB table - should use specific rule _patchRbacResolution(rbac, [_TEST_ROLE_USER], rules)
# Using UAM namespace: data.uam.UserInDB
permissions = rbac.getUserPermissions( permissions = rbac.getUserPermissions(
user, user,
AccessRuleContext.DATA, AccessRuleContext.DATA,
"data.uam.UserInDB" "data.uam.UserInDB",
) )
# Most specific rule should win
assert permissions.read == AccessLevel.MY assert permissions.read == AccessLevel.MY
assert permissions.create == AccessLevel.NONE assert permissions.create == AccessLevel.NONE
assert permissions.update == AccessLevel.MY assert permissions.update == AccessLevel.MY
assert permissions.delete == AccessLevel.NONE assert permissions.delete == AccessLevel.NONE
def testMultipleRolesUnionLogic(self): def testMultipleRolesUnionLogic(self):
"""Test that multiple roles use union (opening) logic.""" """Test that multiple roles use union (opening) logic for view."""
db = Mock(spec=DatabaseConnector) db = Mock(spec=DatabaseConnector)
dbApp = Mock(spec=DatabaseConnector) dbApp = Mock(spec=DatabaseConnector)
rbac = RbacClass(db, dbApp=dbApp) rbac = RbacClass(db, dbApp=dbApp)
# User with multiple roles
user = User( user = User(
id="user1", id="user1",
username="testuser", username="testuser",
roleLabels=["user", "viewer"], roleLabels=["user", "viewer"],
mandateId="mandate1" mandateId="mandate1",
) )
def mockGetRulesForRole(roleLabel, context): rules = [
if context == AccessRuleContext.UI: (
if roleLabel == "user": 1,
return [ AccessRule(
AccessRule( roleId=_TEST_ROLE_USER,
roleLabel="user", context=AccessRuleContext.UI,
context=AccessRuleContext.UI, item="playground",
item="playground", view=False,
view=False # User role hides playground ),
) ),
] (
elif roleLabel == "viewer": 1,
return [ AccessRule(
AccessRule( roleId=_TEST_ROLE_VIEWER,
roleLabel="viewer", context=AccessRuleContext.UI,
context=AccessRuleContext.UI, item="playground",
item="playground", view=True,
view=True # Viewer role shows playground ),
) ),
] ]
return [] _patchRbacResolution(rbac, [_TEST_ROLE_USER, _TEST_ROLE_VIEWER], rules)
rbac._getRulesForRole = mockGetRulesForRole
# Get permissions - union logic should make playground visible
permissions = rbac.getUserPermissions( permissions = rbac.getUserPermissions(
user, user,
AccessRuleContext.UI, AccessRuleContext.UI,
"playground" "playground",
) )
# Union logic: if ANY role has view=true, then view=true assert permissions.view is True
assert permissions.view == True
def testViewFalseOverridesGeneric(self): def testViewFalseOverridesGeneric(self):
"""Test that specific view=false overrides generic view=true.""" """Test that specific view=false overrides generic view=true."""
db = Mock(spec=DatabaseConnector) db = Mock(spec=DatabaseConnector)
dbApp = Mock(spec=DatabaseConnector) dbApp = Mock(spec=DatabaseConnector)
rbac = RbacClass(db, dbApp=dbApp) rbac = RbacClass(db, dbApp=dbApp)
user = User( user = User(
id="user1", id="user1",
username="testuser", username="testuser",
roleLabels=["user"], roleLabels=["user"],
mandateId="mandate1" mandateId="mandate1",
) )
def mockGetRulesForRole(roleLabel, context): rules = [
if roleLabel == "user" and context == AccessRuleContext.UI: (
return [ 1,
AccessRule( AccessRule(
roleLabel="user", roleId=_TEST_ROLE_USER,
context=AccessRuleContext.UI, context=AccessRuleContext.UI,
item=None, # Generic: view all UI item=None,
view=True view=True,
), ),
AccessRule( ),
roleLabel="user", (
context=AccessRuleContext.UI, 1,
item="playground.voice.settings", # Specific: hide this AccessRule(
view=False roleId=_TEST_ROLE_USER,
) context=AccessRuleContext.UI,
] item="playground.voice.settings",
return [] view=False,
),
rbac._getRulesForRole = mockGetRulesForRole ),
]
# Get permissions for specific UI element _patchRbacResolution(rbac, [_TEST_ROLE_USER], rules)
permissions = rbac.getUserPermissions( permissions = rbac.getUserPermissions(
user, user,
AccessRuleContext.UI, AccessRuleContext.UI,
"playground.voice.settings" "playground.voice.settings",
) )
# Specific rule should override generic assert permissions.view is False
assert permissions.view == False
def testNoRolesReturnsNoAccess(self): def testNoRolesReturnsNoAccess(self):
"""Test that user with no roles gets no access.""" """Test that user with no resolved role IDs gets no access."""
db = Mock(spec=DatabaseConnector) db = Mock(spec=DatabaseConnector)
dbApp = Mock(spec=DatabaseConnector) dbApp = Mock(spec=DatabaseConnector)
rbac = RbacClass(db, dbApp=dbApp) rbac = RbacClass(db, dbApp=dbApp)
dbApp.getRecordset = Mock(return_value=[])
user = User( user = User(
id="user1", id="user1",
username="testuser", username="testuser",
roleLabels=[], # No roles roleLabels=[],
mandateId="mandate1" mandateId="mandate1",
) )
permissions = rbac.getUserPermissions( permissions = rbac.getUserPermissions(
user, user,
AccessRuleContext.DATA, AccessRuleContext.DATA,
"SomeTable" "SomeTable",
) )
assert permissions.view == False assert permissions.view is False
assert permissions.read == AccessLevel.NONE assert permissions.read == AccessLevel.NONE
assert permissions.create == AccessLevel.NONE assert permissions.create == AccessLevel.NONE
assert permissions.update == AccessLevel.NONE assert permissions.update == AccessLevel.NONE
assert permissions.delete == AccessLevel.NONE assert permissions.delete == AccessLevel.NONE
def testFindMostSpecificRule(self): def testFindMostSpecificRule(self):
"""Test findMostSpecificRule method.""" """Test findMostSpecificRule method."""
db = Mock(spec=DatabaseConnector) db = Mock(spec=DatabaseConnector)
dbApp = Mock(spec=DatabaseConnector) dbApp = Mock(spec=DatabaseConnector)
rbac = RbacClass(db, dbApp=dbApp) rbac = RbacClass(db, dbApp=dbApp)
rules = [ rules = [
AccessRule( AccessRule(
roleLabel="user", roleId=_TEST_ROLE_USER,
context=AccessRuleContext.DATA, context=AccessRuleContext.DATA,
item=None, # Generic item=None,
view=True, view=True,
read=AccessLevel.GROUP read=AccessLevel.GROUP,
), ),
AccessRule( AccessRule(
roleLabel="user", roleId=_TEST_ROLE_USER,
context=AccessRuleContext.DATA, context=AccessRuleContext.DATA,
item="data.uam.UserInDB", # Table-level with UAM namespace item="data.uam.UserInDB",
view=True, view=True,
read=AccessLevel.MY read=AccessLevel.MY,
), ),
AccessRule( AccessRule(
roleLabel="user", roleId=_TEST_ROLE_USER,
context=AccessRuleContext.DATA, context=AccessRuleContext.DATA,
item="data.uam.UserInDB.email", # Field-level - most specific item="data.uam.UserInDB.email",
view=True, view=True,
read=AccessLevel.NONE read=AccessLevel.NONE,
) ),
] ]
# Test exact match
rule = rbac.findMostSpecificRule(rules, "data.uam.UserInDB.email") rule = rbac.findMostSpecificRule(rules, "data.uam.UserInDB.email")
assert rule is not None assert rule is not None
assert rule.item == "data.uam.UserInDB.email" assert rule.item == "data.uam.UserInDB.email"
assert rule.read == AccessLevel.NONE assert rule.read == AccessLevel.NONE
# Test table-level match
rule = rbac.findMostSpecificRule(rules, "data.uam.UserInDB") rule = rbac.findMostSpecificRule(rules, "data.uam.UserInDB")
assert rule is not None assert rule is not None
assert rule.item == "data.uam.UserInDB" assert rule.item == "data.uam.UserInDB"
assert rule.read == AccessLevel.MY assert rule.read == AccessLevel.MY
# Test generic fallback
rule = rbac.findMostSpecificRule(rules, "OtherTable") rule = rbac.findMostSpecificRule(rules, "OtherTable")
assert rule is not None assert rule is not None
assert rule.item is None assert rule.item is None
assert rule.read == AccessLevel.GROUP assert rule.read == AccessLevel.GROUP
def testValidateAccessRuleOpeningRights(self): def testValidateAccessRuleOpeningRights(self):
"""Test that CUD permissions respect read permission level.""" """Test that CUD permissions respect read permission level."""
db = Mock(spec=DatabaseConnector) db = Mock(spec=DatabaseConnector)
dbApp = Mock(spec=DatabaseConnector) dbApp = Mock(spec=DatabaseConnector)
rbac = RbacClass(db, dbApp=dbApp) rbac = RbacClass(db, dbApp=dbApp)
# Valid: Read=MY, Create=MY (allowed)
rule1 = AccessRule( rule1 = AccessRule(
roleLabel="user", roleId=_TEST_ROLE_USER,
context=AccessRuleContext.DATA, context=AccessRuleContext.DATA,
item="data.uam.UserInDB", item="data.uam.UserInDB",
view=True, view=True,
read=AccessLevel.MY, read=AccessLevel.MY,
create=AccessLevel.MY, create=AccessLevel.MY,
update=AccessLevel.MY, update=AccessLevel.MY,
delete=AccessLevel.MY delete=AccessLevel.MY,
) )
assert rbac.validateAccessRule(rule1) == True assert rbac.validateAccessRule(rule1) is True
# Invalid: Read=MY, Create=GROUP (not allowed - GROUP > MY)
rule2 = AccessRule( rule2 = AccessRule(
roleLabel="user", roleId=_TEST_ROLE_USER,
context=AccessRuleContext.DATA, context=AccessRuleContext.DATA,
item="data.uam.UserInDB", item="data.uam.UserInDB",
view=True, view=True,
read=AccessLevel.MY, read=AccessLevel.MY,
create=AccessLevel.GROUP, # Not allowed create=AccessLevel.GROUP,
update=AccessLevel.MY, update=AccessLevel.MY,
delete=AccessLevel.MY delete=AccessLevel.MY,
) )
assert rbac.validateAccessRule(rule2) == False assert rbac.validateAccessRule(rule2) is False
# Valid: Read=GROUP, Create=GROUP (allowed)
rule3 = AccessRule( rule3 = AccessRule(
roleLabel="admin", roleId="test-rid-admin",
context=AccessRuleContext.DATA, context=AccessRuleContext.DATA,
item="data.uam.UserInDB", item="data.uam.UserInDB",
view=True, view=True,
read=AccessLevel.GROUP, read=AccessLevel.GROUP,
create=AccessLevel.GROUP, create=AccessLevel.GROUP,
update=AccessLevel.GROUP, update=AccessLevel.GROUP,
delete=AccessLevel.GROUP delete=AccessLevel.GROUP,
) )
assert rbac.validateAccessRule(rule3) == True assert rbac.validateAccessRule(rule3) is True
# Invalid: Read=NONE, Create=MY (not allowed - no read access)
rule4 = AccessRule( rule4 = AccessRule(
roleLabel="user", roleId=_TEST_ROLE_USER,
context=AccessRuleContext.DATA, context=AccessRuleContext.DATA,
item="data.uam.UserInDB", item="data.uam.UserInDB",
view=True, view=True,
read=AccessLevel.NONE, read=AccessLevel.NONE,
create=AccessLevel.MY, # Not allowed without read create=AccessLevel.MY,
update=AccessLevel.MY, update=AccessLevel.MY,
delete=AccessLevel.MY delete=AccessLevel.MY,
) )
assert rbac.validateAccessRule(rule4) == False assert rbac.validateAccessRule(rule4) is False
def testUiContextOnlyViewMatters(self): def testUiContextOnlyViewMatters(self):
"""Test that UI context only checks view permission.""" """Test that UI context only checks view permission."""
db = Mock(spec=DatabaseConnector) db = Mock(spec=DatabaseConnector)
dbApp = Mock(spec=DatabaseConnector) dbApp = Mock(spec=DatabaseConnector)
rbac = RbacClass(db, dbApp=dbApp) rbac = RbacClass(db, dbApp=dbApp)
user = User( user = User(
id="user1", id="user1",
username="testuser", username="testuser",
roleLabels=["user"], roleLabels=["user"],
mandateId="mandate1" mandateId="mandate1",
) )
def mockGetRulesForRole(roleLabel, context): rules = [
if roleLabel == "user" and context == AccessRuleContext.UI: (
return [ 1,
AccessRule( AccessRule(
roleLabel="user", roleId=_TEST_ROLE_USER,
context=AccessRuleContext.UI, context=AccessRuleContext.UI,
item="playground", item="playground",
view=True view=True,
# No read/create/update/delete for UI context ),
) )
] ]
return [] _patchRbacResolution(rbac, [_TEST_ROLE_USER], rules)
rbac._getRulesForRole = mockGetRulesForRole
permissions = rbac.getUserPermissions( permissions = rbac.getUserPermissions(
user, user,
AccessRuleContext.UI, AccessRuleContext.UI,
"playground" "playground",
) )
assert permissions.view == True assert permissions.view is True
# Other permissions don't matter for UI context
def testResourceContextOnlyViewMatters(self): def testResourceContextOnlyViewMatters(self):
"""Test that RESOURCE context only checks view permission.""" """Test that RESOURCE context only checks view permission."""
db = Mock(spec=DatabaseConnector) db = Mock(spec=DatabaseConnector)
dbApp = Mock(spec=DatabaseConnector) dbApp = Mock(spec=DatabaseConnector)
rbac = RbacClass(db, dbApp=dbApp) rbac = RbacClass(db, dbApp=dbApp)
user = User( user = User(
id="user1", id="user1",
username="testuser", username="testuser",
roleLabels=["user"], roleLabels=["user"],
mandateId="mandate1" mandateId="mandate1",
) )
def mockGetRulesForRole(roleLabel, context): rules = [
if roleLabel == "user" and context == AccessRuleContext.RESOURCE: (
return [ 1,
AccessRule( AccessRule(
roleLabel="user", roleId=_TEST_ROLE_USER,
context=AccessRuleContext.RESOURCE, context=AccessRuleContext.RESOURCE,
item="ai.model.anthropic", item="ai.model.anthropic",
view=True view=True,
) ),
] )
return [] ]
_patchRbacResolution(rbac, [_TEST_ROLE_USER], rules)
rbac._getRulesForRole = mockGetRulesForRole
permissions = rbac.getUserPermissions( permissions = rbac.getUserPermissions(
user, user,
AccessRuleContext.RESOURCE, AccessRuleContext.RESOURCE,
"ai.model.anthropic" "ai.model.anthropic",
) )
assert permissions.view == True assert permissions.view is True