From 6e6cf7012b4d74b2057ae2852c911a62915688b7 Mon Sep 17 00:00:00 2001
From: ValueOn AG
Date: Sun, 7 Dec 2025 22:00:55 +0100
Subject: [PATCH] rbac module testing done
---
docs/rbac_getrecordset_review.md | 135 +++++++++
modules/datamodels/datamodelUam.py | 7 -
modules/interfaces/interfaceBootstrap.py | 3 -
modules/interfaces/interfaceDbAppObjects.py | 36 ++-
modules/interfaces/interfaceDbChatObjects.py | 28 +-
.../interfaces/interfaceDbComponentObjects.py | 23 +-
modules/migration/__init__.py | 1 -
modules/migration/migrateUamToRbac.py | 212 -------------
modules/routes/routeAdminAutomationEvents.py | 8 +-
modules/routes/routeDataUsers.py | 6 +-
modules/routes/routeSecurityAdmin.py | 11 +-
modules/routes/routeSecurityLocal.py | 7 +-
modules/routes/routeWorkflows.py | 8 +-
tests/integration/rbac/test_rbac_migration.py | 282 ------------------
tests/unit/rbac/test_rbac_bootstrap.py | 3 +-
15 files changed, 209 insertions(+), 561 deletions(-)
create mode 100644 docs/rbac_getrecordset_review.md
delete mode 100644 modules/migration/__init__.py
delete mode 100644 modules/migration/migrateUamToRbac.py
delete mode 100644 tests/integration/rbac/test_rbac_migration.py
diff --git a/docs/rbac_getrecordset_review.md b/docs/rbac_getrecordset_review.md
new file mode 100644
index 00000000..d2c06524
--- /dev/null
+++ b/docs/rbac_getrecordset_review.md
@@ -0,0 +1,135 @@
+# RBAC getRecordset() Review
+
+## Overview
+Review of all `getRecordset()` calls in `interfaceDbChatObjects.py` and `interfaceDbComponentObjects.py` to determine which should be converted to `getRecordsetWithRBAC()`.
+
+## Analysis Criteria
+- **Convert to RBAC**: User-facing data that should respect access control
+- **Keep as-is**: Internal/technical operations that don't need RBAC filtering
+
+---
+
+## interfaceDbChatObjects.py
+
+### Summary: **14 calls found - ALL should be converted to `getRecordsetWithRBAC()`**
+
+All calls access user-facing data (ChatMessage, ChatDocument, ChatStat, ChatLog) and should respect RBAC even when:
+- Used in cascade delete operations (after parent access is verified)
+- Used to fetch child records (after parent access is verified)
+- Used for existence checks
+
+**Rationale**: RBAC should be applied at every data access point to ensure consistent security and prevent potential bypass scenarios.
+
+### Detailed List:
+
+1. **Line 760** - `deleteWorkflow()` - Cascade delete ChatStat
+ - **Action**: Convert to `getRecordsetWithRBAC(ChatStat, self.currentUser, recordFilter={"messageId": messageId})`
+ - **Reason**: Deleting related data should respect RBAC
+
+2. **Line 765** - `deleteWorkflow()` - Cascade delete ChatDocument
+ - **Action**: Convert to `getRecordsetWithRBAC(ChatDocument, self.currentUser, recordFilter={"messageId": messageId})`
+ - **Reason**: Deleting related data should respect RBAC
+
+3. **Line 773** - `deleteWorkflow()` - Cascade delete ChatStat (workflow level)
+ - **Action**: Convert to `getRecordsetWithRBAC(ChatStat, self.currentUser, recordFilter={"workflowId": workflowId})`
+ - **Reason**: Deleting related data should respect RBAC
+
+4. **Line 778** - `deleteWorkflow()` - Cascade delete ChatLog
+ - **Action**: Convert to `getRecordsetWithRBAC(ChatLog, self.currentUser, recordFilter={"workflowId": workflowId})`
+ - **Reason**: Deleting related data should respect RBAC
+
+5. **Line 821** - `getMessages()` - Fetch messages for workflow
+ - **Action**: Convert to `getRecordsetWithRBAC(ChatMessage, self.currentUser, recordFilter={"workflowId": workflowId})`
+ - **Reason**: Child records should still respect RBAC even if parent access is verified
+
+6. **Line 1062** - `updateMessage()` - Check if message exists
+ - **Action**: Convert to `getRecordsetWithRBAC(ChatMessage, self.currentUser, recordFilter={"id": messageId})`
+ - **Reason**: Existence checks should respect RBAC
+
+7. **Line 1167** - `deleteMessage()` - Cascade delete ChatStat
+ - **Action**: Convert to `getRecordsetWithRBAC(ChatStat, self.currentUser, recordFilter={"messageId": messageId})`
+ - **Reason**: Deleting related data should respect RBAC
+
+8. **Line 1172** - `deleteMessage()` - Cascade delete ChatDocument
+ - **Action**: Convert to `getRecordsetWithRBAC(ChatDocument, self.currentUser, recordFilter={"messageId": messageId})`
+ - **Reason**: Deleting related data should respect RBAC
+
+9. **Line 1199** - `deleteFileFromMessage()` - Get documents for message
+ - **Action**: Convert to `getRecordsetWithRBAC(ChatDocument, self.currentUser, recordFilter={"messageId": messageId})`
+ - **Reason**: Accessing related data should respect RBAC
+
+10. **Line 1242** - `getDocuments()` - Get documents for message
+ - **Action**: Convert to `getRecordsetWithRBAC(ChatDocument, self.currentUser, recordFilter={"messageId": messageId})`
+ - **Reason**: Public method accessing user data should respect RBAC
+
+11. **Line 1291** - `getLogs()` - Fetch logs for workflow
+ - **Action**: Convert to `getRecordsetWithRBAC(ChatLog, self.currentUser, recordFilter={"workflowId": workflowId})`
+ - **Reason**: Child records should still respect RBAC even if parent access is verified
+
+12. **Line 1410** - `getStats()` - Fetch stats for workflow
+ - **Action**: Convert to `getRecordsetWithRBAC(ChatStat, self.currentUser, recordFilter={"workflowId": workflowId})`
+ - **Reason**: Child records should still respect RBAC even if parent access is verified
+
+13. **Line 1460** - `getUnifiedChatData()` - Fetch messages for workflow
+ - **Action**: Convert to `getRecordsetWithRBAC(ChatMessage, self.currentUser, recordFilter={"workflowId": workflowId})`
+ - **Reason**: Child records should still respect RBAC even if parent access is verified
+
+14. **Line 1501** - `getUnifiedChatData()` - Fetch logs for workflow
+ - **Action**: Convert to `getRecordsetWithRBAC(ChatLog, self.currentUser, recordFilter={"workflowId": workflowId})`
+ - **Reason**: Child records should still respect RBAC even if parent access is verified
+
+---
+
+## interfaceDbComponentObjects.py
+
+### Summary: **3 calls found - 1 keep as-is, 2 should be converted**
+
+### Detailed List:
+
+1. **Line 149** - `_initializeStandardPrompts()` - Check if prompts exist
+ - **Action**: **KEEP AS-IS** ✅
+ - **Reason**: This is initialization code that runs during bootstrap. It checks if any prompts exist to avoid re-initialization. Since this runs with root user context and is a system-level check, RBAC is not needed here.
+
+2. **Line 947** - `deleteFile()` - Get FileData for deletion
+ - **Action**: **CONVERT** to `getRecordsetWithRBAC(FileData, self.currentUser, recordFilter={"id": fileId})`
+ - **Reason**: FileData stores binary data associated with FileItem. While it's a technical table, we should still respect RBAC for consistency and security. The file access was already checked via `getFile()`, but FileData access should also be RBAC-filtered.
+
+3. **Line 1032** - `getFileData()` - Get FileData for reading
+ - **Action**: **CONVERT** to `getRecordsetWithRBAC(FileData, self.currentUser, recordFilter={"id": fileId})`
+ - **Reason**: FileData access should respect RBAC. The file access was already checked via `getFile()`, but FileData access should also be RBAC-filtered for consistency.
+
+**Note on FileData**: FileData is a technical table storing binary file content. However, for consistency and security, RBAC should still be applied. If FileData doesn't have RBAC rules defined, the RBAC filter will effectively be a no-op (allowing access), but the pattern is consistent.
+
+---
+
+## Implementation Priority
+
+### High Priority (User-facing data access)
+- All `interfaceDbChatObjects.py` calls (14 calls)
+- `interfaceDbComponentObjects.py` FileData calls (2 calls)
+
+### Low Priority (System initialization)
+- `interfaceDbComponentObjects.py` Prompt initialization check (1 call) - Keep as-is
+
+---
+
+## Next Steps
+
+1. Convert all 14 calls in `interfaceDbChatObjects.py` to `getRecordsetWithRBAC()`
+2. Convert 2 FileData calls in `interfaceDbComponentObjects.py` to `getRecordsetWithRBAC()`
+3. Keep 1 Prompt initialization check as-is
+4. Test all changes to ensure RBAC filtering works correctly
+5. Verify cascade delete operations still work correctly with RBAC
+
+---
+
+## Testing Checklist
+
+After conversion, verify:
+- [ ] Workflow deletion still works (cascade deletes)
+- [ ] Message deletion still works (cascade deletes)
+- [ ] File deletion still works (FileData cleanup)
+- [ ] File reading still works (FileData access)
+- [ ] Child record access (messages, logs, stats, documents) respects RBAC
+- [ ] Users can only access data they have permission for
+- [ ] No performance degradation from RBAC filtering
diff --git a/modules/datamodels/datamodelUam.py b/modules/datamodels/datamodelUam.py
index 4c9e0a84..49e62beb 100644
--- a/modules/datamodels/datamodelUam.py
+++ b/modules/datamodels/datamodelUam.py
@@ -13,11 +13,6 @@ class AuthAuthority(str, Enum):
GOOGLE = "google"
MSFT = "msft"
-class UserPrivilege(str, Enum): # TODO: TO remove, one new RBAC System is in place!
- SYSADMIN = "sysadmin"
- ADMIN = "admin"
- USER = "user"
-
class ConnectionStatus(str, Enum):
ACTIVE = "active"
EXPIRED = "expired"
@@ -152,7 +147,6 @@ class User(BaseModel):
{"value": "it", "label": {"en": "Italiano", "fr": "Italien"}},
]})
enabled: bool = Field(default=True, description="Indicates whether the user is enabled", json_schema_extra={"frontend_type": "checkbox", "frontend_readonly": False, "frontend_required": False})
- privilege: UserPrivilege = Field(default=UserPrivilege.USER, description="Permission level (DEPRECATED: use roleLabels instead)", json_schema_extra={"frontend_type": "select", "frontend_readonly": False, "frontend_required": False, "frontend_options": "user.role"})
roleLabels: List[str] = Field(
default_factory=list,
description="List of role labels assigned to this user. All roles are opening roles (union) - if one role enables something, it is enabled.",
@@ -174,7 +168,6 @@ registerModelLabels(
"fullName": {"en": "Full Name", "fr": "Nom complet"},
"language": {"en": "Language", "fr": "Langue"},
"enabled": {"en": "Enabled", "fr": "Activé"},
- "privilege": {"en": "Privilege", "fr": "Privilège"},
"roleLabels": {"en": "Role Labels", "fr": "Labels de rôle"},
"authenticationAuthority": {"en": "Auth Authority", "fr": "Autorité d'authentification"},
"mandateId": {"en": "Mandate ID", "fr": "ID de mandat"},
diff --git a/modules/interfaces/interfaceBootstrap.py b/modules/interfaces/interfaceBootstrap.py
index 5c4a90a1..55d94c3c 100644
--- a/modules/interfaces/interfaceBootstrap.py
+++ b/modules/interfaces/interfaceBootstrap.py
@@ -11,7 +11,6 @@ from modules.shared.configuration import APP_CONFIG
from modules.datamodels.datamodelUam import (
Mandate,
UserInDB,
- UserPrivilege,
AuthAuthority,
)
from modules.datamodels.datamodelRbac import (
@@ -103,7 +102,6 @@ def initAdminUser(db: DatabaseConnector, mandateId: Optional[str]) -> Optional[s
fullName="Administrator",
enabled=True,
language="en",
- privilege=UserPrivilege.SYSADMIN,
roleLabels=["sysadmin"],
authenticationAuthority=AuthAuthority.LOCAL,
hashedPassword=_getPasswordHash(APP_CONFIG.get("APP_INIT_PASS_ADMIN_SECRET")),
@@ -140,7 +138,6 @@ def initEventUser(db: DatabaseConnector, mandateId: Optional[str]) -> Optional[s
fullName="Event",
enabled=True,
language="en",
- privilege=UserPrivilege.SYSADMIN,
roleLabels=["sysadmin"],
authenticationAuthority=AuthAuthority.LOCAL,
hashedPassword=_getPasswordHash(APP_CONFIG.get("APP_INIT_PASS_EVENT_SECRET")),
diff --git a/modules/interfaces/interfaceDbAppObjects.py b/modules/interfaces/interfaceDbAppObjects.py
index 900f7328..cf582fa2 100644
--- a/modules/interfaces/interfaceDbAppObjects.py
+++ b/modules/interfaces/interfaceDbAppObjects.py
@@ -20,7 +20,6 @@ from modules.datamodels.datamodelUam import (
UserInDB,
UserConnection,
AuthAuthority,
- UserPrivilege,
ConnectionStatus,
)
from modules.datamodels.datamodelRbac import (
@@ -488,19 +487,20 @@ class AppObjects:
def getUser(self, userId: str) -> Optional[User]:
"""Returns a user by ID if user has access."""
try:
- # Get all users
- users = self.db.getRecordset(UserInDB)
+ # Get users filtered by RBAC
+ users = self.db.getRecordsetWithRBAC(
+ UserInDB,
+ self.currentUser,
+ recordFilter={"id": userId}
+ )
+
if not users:
return None
- # Find user by ID
- for user_dict in users:
- if user_dict.get("id") == userId:
- # User already filtered by RBAC, just clean fields
- cleanedUser = {k: v for k, v in user_dict.items() if not k.startswith("_")}
- return User(**cleanedUser)
-
- return None
+ # User already filtered by RBAC, just clean fields
+ user_dict = users[0]
+ cleanedUser = {k: v for k, v in user_dict.items() if not k.startswith("_")}
+ return User(**cleanedUser)
except Exception as e:
logger.error(f"Error getting user by ID: {str(e)}")
@@ -542,7 +542,7 @@ class AppObjects:
fullName: str = None,
language: str = "en",
enabled: bool = True,
- privilege: UserPrivilege = UserPrivilege.USER,
+ roleLabels: List[str] = None,
authenticationAuthority: AuthAuthority = AuthAuthority.LOCAL,
externalId: str = None,
externalUsername: str = None,
@@ -568,6 +568,10 @@ class AppObjects:
mandateId = self._getDefaultMandateId()
logger.warning(f"Using default mandate ID {mandateId} for new user {username}")
+ # Default roleLabels to ["user"] if not provided
+ if roleLabels is None or not roleLabels:
+ roleLabels = ["user"]
+
# Create user data using UserInDB model
userData = UserInDB(
username=username,
@@ -576,7 +580,7 @@ class AppObjects:
language=language,
mandateId=mandateId,
enabled=enabled,
- privilege=privilege,
+ roleLabels=roleLabels,
authenticationAuthority=authenticationAuthority,
hashedPassword=self._getPasswordHash(password) if password else None,
connections=[],
@@ -734,7 +738,11 @@ class AppObjects:
if not initialUserId:
return None
- users = self.db.getRecordset(UserInDB, recordFilter={"id": initialUserId})
+ users = self.db.getRecordsetWithRBAC(
+ UserInDB,
+ self.currentUser,
+ recordFilter={"id": initialUserId}
+ )
return users[0] if users else None
except Exception as e:
logger.error(f"Error getting initial user: {str(e)}")
diff --git a/modules/interfaces/interfaceDbChatObjects.py b/modules/interfaces/interfaceDbChatObjects.py
index 6093eb78..ac6df640 100644
--- a/modules/interfaces/interfaceDbChatObjects.py
+++ b/modules/interfaces/interfaceDbChatObjects.py
@@ -757,12 +757,12 @@ class ChatObjects:
messageId = message.id
if messageId:
# Delete message stats
- existing_stats = self.db.getRecordset(ChatStat, recordFilter={"messageId": messageId})
+ existing_stats = self.db.getRecordsetWithRBAC(ChatStat, self.currentUser, recordFilter={"messageId": messageId})
for stat in existing_stats:
self.db.recordDelete(ChatStat, stat["id"])
# Delete message documents (but NOT the files!)
- existing_docs = self.db.getRecordset(ChatDocument, recordFilter={"messageId": messageId})
+ existing_docs = self.db.getRecordsetWithRBAC(ChatDocument, self.currentUser, recordFilter={"messageId": messageId})
for doc in existing_docs:
self.db.recordDelete(ChatDocument, doc["id"])
@@ -770,12 +770,12 @@ class ChatObjects:
self.db.recordDelete(ChatMessage, messageId)
# 2. Delete workflow stats
- existing_stats = self.db.getRecordset(ChatStat, recordFilter={"workflowId": workflowId})
+ existing_stats = self.db.getRecordsetWithRBAC(ChatStat, self.currentUser, recordFilter={"workflowId": workflowId})
for stat in existing_stats:
self.db.recordDelete(ChatStat, stat["id"])
# 3. Delete workflow logs
- existing_logs = self.db.getRecordset(ChatLog, recordFilter={"workflowId": workflowId})
+ existing_logs = self.db.getRecordsetWithRBAC(ChatLog, self.currentUser, recordFilter={"workflowId": workflowId})
for log in existing_logs:
self.db.recordDelete(ChatLog, log["id"])
@@ -818,7 +818,7 @@ class ChatObjects:
return PaginatedResult(items=[], totalItems=0, totalPages=0)
# Get messages for this workflow from normalized table
- messages = self.db.getRecordset(ChatMessage, recordFilter={"workflowId": workflowId})
+ messages = self.db.getRecordsetWithRBAC(ChatMessage, self.currentUser, recordFilter={"workflowId": workflowId})
# Convert raw messages to dict format for sorting/filtering
messageDicts = []
@@ -1059,7 +1059,7 @@ class ChatObjects:
raise ValueError("messageId cannot be empty")
# Check if message exists in database
- messages = self.db.getRecordset(ChatMessage, recordFilter={"id": messageId})
+ messages = self.db.getRecordsetWithRBAC(ChatMessage, self.currentUser, recordFilter={"id": messageId})
if not messages:
logger.warning(f"Message with ID {messageId} does not exist in database")
@@ -1164,12 +1164,12 @@ class ChatObjects:
# CASCADE DELETE: Delete all related data first
# 1. Delete message stats
- existing_stats = self.db.getRecordset(ChatStat, recordFilter={"messageId": messageId})
+ existing_stats = self.db.getRecordsetWithRBAC(ChatStat, self.currentUser, recordFilter={"messageId": messageId})
for stat in existing_stats:
self.db.recordDelete(ChatStat, stat["id"])
# 2. Delete message documents (but NOT the files!)
- existing_docs = self.db.getRecordset(ChatDocument, recordFilter={"messageId": messageId})
+ existing_docs = self.db.getRecordsetWithRBAC(ChatDocument, self.currentUser, recordFilter={"messageId": messageId})
for doc in existing_docs:
self.db.recordDelete(ChatDocument, doc["id"])
@@ -1196,7 +1196,7 @@ class ChatObjects:
# Get documents for this message from normalized table
- documents = self.db.getRecordset(ChatDocument, recordFilter={"messageId": messageId})
+ documents = self.db.getRecordsetWithRBAC(ChatDocument, self.currentUser, recordFilter={"messageId": messageId})
if not documents:
logger.warning(f"No documents found for message {messageId}")
@@ -1239,7 +1239,7 @@ class ChatObjects:
def getDocuments(self, messageId: str) -> List[ChatDocument]:
"""Returns documents for a message from normalized table."""
try:
- documents = self.db.getRecordset(ChatDocument, recordFilter={"messageId": messageId})
+ documents = self.db.getRecordsetWithRBAC(ChatDocument, self.currentUser, recordFilter={"messageId": messageId})
return [ChatDocument(**doc) for doc in documents]
except Exception as e:
logger.error(f"Error getting message documents: {str(e)}")
@@ -1288,7 +1288,7 @@ class ChatObjects:
return PaginatedResult(items=[], totalItems=0, totalPages=0)
# Get logs for this workflow from normalized table
- logs = self.db.getRecordset(ChatLog, recordFilter={"workflowId": workflowId})
+ logs = self.db.getRecordsetWithRBAC(ChatLog, self.currentUser, recordFilter={"workflowId": workflowId})
# Convert raw logs to dict format for sorting/filtering
logDicts = []
@@ -1407,7 +1407,7 @@ class ChatObjects:
return []
# Get stats for this workflow from normalized table
- stats = self.db.getRecordset(ChatStat, recordFilter={"workflowId": workflowId})
+ stats = self.db.getRecordsetWithRBAC(ChatStat, self.currentUser, recordFilter={"workflowId": workflowId})
if not stats:
return []
@@ -1457,7 +1457,7 @@ class ChatObjects:
items = []
# Get messages
- messages = self.db.getRecordset(ChatMessage, recordFilter={"workflowId": workflowId})
+ messages = self.db.getRecordsetWithRBAC(ChatMessage, self.currentUser, recordFilter={"workflowId": workflowId})
for msg in messages:
# Apply timestamp filtering in Python
msgTimestamp = parseTimestamp(msg.get("publishedAt"), default=getUtcTimestamp())
@@ -1498,7 +1498,7 @@ class ChatObjects:
})
# Get logs
- logs = self.db.getRecordset(ChatLog, recordFilter={"workflowId": workflowId})
+ logs = self.db.getRecordsetWithRBAC(ChatLog, self.currentUser, recordFilter={"workflowId": workflowId})
for log in logs:
# Apply timestamp filtering in Python
logTimestamp = parseTimestamp(log.get("timestamp"), default=getUtcTimestamp())
diff --git a/modules/interfaces/interfaceDbComponentObjects.py b/modules/interfaces/interfaceDbComponentObjects.py
index 0e1be949..cedc1fec 100644
--- a/modules/interfaces/interfaceDbComponentObjects.py
+++ b/modules/interfaces/interfaceDbComponentObjects.py
@@ -838,10 +838,11 @@ class ComponentObjects:
def _isfileNameUnique(self, fileName: str, excludeFileId: Optional[str] = None) -> bool:
"""Checks if a fileName is unique for the current user."""
- # Get all files for current user
- files = self.db.getRecordset(FileItem, recordFilter={
- "_createdBy": self.currentUser.id
- })
+ # Get all files filtered by RBAC (will be filtered by user's access level)
+ files = self.db.getRecordsetWithRBAC(
+ FileItem,
+ self.currentUser
+ )
# Check if fileName exists (excluding the current file if updating)
for file in files:
@@ -930,16 +931,20 @@ class ComponentObjects:
if not self.checkRbacPermission(FileItem, "update", fileId):
raise PermissionError(f"No permission to delete file {fileId}")
- # Check for other references to this file (by hash)
+ # Check for other references to this file (by hash) - use RBAC to only check files user has access to
fileHash = file.fileHash
if fileHash:
- otherReferences = [f for f in self.db.getRecordset(FileItem, recordFilter={"fileHash": fileHash})
- if f["id"] != fileId]
+ allReferences = self.db.getRecordsetWithRBAC(
+ FileItem,
+ self.currentUser,
+ recordFilter={"fileHash": fileHash}
+ )
+ otherReferences = [f for f in allReferences if f["id"] != fileId]
# Only delete associated fileData if no other references exist
if not otherReferences:
try:
- fileDataEntries = self.db.getRecordset(FileData, recordFilter={"id": fileId})
+ fileDataEntries = self.db.getRecordsetWithRBAC(FileData, self.currentUser, recordFilter={"id": fileId})
if fileDataEntries:
self.db.recordDelete(FileData, fileId)
logger.debug(f"FileData for file {fileId} deleted")
@@ -1024,7 +1029,7 @@ class ComponentObjects:
logger.warning(f"No access to file ID {fileId}")
return None
- fileDataEntries = self.db.getRecordset(FileData, recordFilter={"id": fileId})
+ fileDataEntries = self.db.getRecordsetWithRBAC(FileData, self.currentUser, recordFilter={"id": fileId})
if not fileDataEntries:
logger.warning(f"No data found for file ID {fileId}")
return None
diff --git a/modules/migration/__init__.py b/modules/migration/__init__.py
deleted file mode 100644
index 49056d7c..00000000
--- a/modules/migration/__init__.py
+++ /dev/null
@@ -1 +0,0 @@
-"""Migration modules for database schema and data migrations."""
diff --git a/modules/migration/migrateUamToRbac.py b/modules/migration/migrateUamToRbac.py
deleted file mode 100644
index 688bf8e7..00000000
--- a/modules/migration/migrateUamToRbac.py
+++ /dev/null
@@ -1,212 +0,0 @@
-"""
-Migration script to convert UAM (User Access Management) to RBAC (Role-Based Access Control).
-
-This script:
-1. Creates AccessRule table if it doesn't exist
-2. Adds roleLabels column to User table if it doesn't exist
-3. Converts User.privilege to User.roleLabels
-4. Creates initial RBAC rules based on bootstrap logic
-"""
-
-import logging
-from typing import List, Dict, Any
-from modules.connectors.connectorDbPostgre import DatabaseConnector
-from modules.shared.configuration import APP_CONFIG
-from modules.datamodels.datamodelUam import UserInDB, UserPrivilege
-from modules.datamodels.datamodelRbac import AccessRule, AccessRuleContext
-from modules.datamodels.datamodelUam import AccessLevel
-from modules.interfaces.interfaceBootstrap import initRbacRules
-
-logger = logging.getLogger(__name__)
-
-
-def migrateUamToRbac(db: DatabaseConnector, dryRun: bool = False) -> Dict[str, Any]:
- """
- Migrate from UAM to RBAC system.
-
- Args:
- db: Database connector instance
- dryRun: If True, only report what would be done without making changes
-
- Returns:
- Dictionary with migration results
- """
- results = {
- "schemaChanges": [],
- "dataMigrations": [],
- "rulesCreated": 0,
- "usersUpdated": 0,
- "errors": []
- }
-
- try:
- # Step 1: Ensure AccessRule table exists
- logger.info("Step 1: Ensuring AccessRule table exists")
- if not dryRun:
- db._ensureTableExists(AccessRule)
- results["schemaChanges"].append("AccessRule table ensured")
- else:
- results["schemaChanges"].append("Would ensure AccessRule table")
-
- # Step 2: Add roleLabels column to UserInDB table if it doesn't exist
- logger.info("Step 2: Adding roleLabels column to UserInDB table")
- if not dryRun:
- try:
- with db.connection.cursor() as cursor:
- # Check if column exists
- cursor.execute("""
- SELECT column_name
- FROM information_schema.columns
- WHERE table_name = 'UserInDB' AND column_name = 'roleLabels'
- """)
- columnExists = cursor.fetchone() is not None
-
- if not columnExists:
- cursor.execute('ALTER TABLE "UserInDB" ADD COLUMN "roleLabels" JSONB DEFAULT \'[]\'::jsonb')
- db.connection.commit()
- results["schemaChanges"].append("Added roleLabels column to UserInDB")
- logger.info("Added roleLabels column to UserInDB table")
- else:
- results["schemaChanges"].append("roleLabels column already exists")
- logger.info("roleLabels column already exists in UserInDB table")
- except Exception as e:
- logger.error(f"Error adding roleLabels column: {e}")
- results["errors"].append(f"Error adding roleLabels column: {e}")
- db.connection.rollback()
- else:
- results["schemaChanges"].append("Would add roleLabels column to UserInDB")
-
- # Step 3: Convert User.privilege to User.roleLabels
- logger.info("Step 3: Converting User.privilege to User.roleLabels")
- if not dryRun:
- try:
- users = db.getRecordset(UserInDB)
- updatedCount = 0
-
- for user in users:
- privilege = user.get("privilege")
- roleLabels = user.get("roleLabels", [])
-
- # Skip if already has roleLabels
- if roleLabels and isinstance(roleLabels, list) and len(roleLabels) > 0:
- logger.debug(f"User {user.get('id')} already has roleLabels: {roleLabels}")
- continue
-
- # Convert privilege to roleLabels
- if privilege == UserPrivilege.SYSADMIN.value:
- newRoleLabels = ["sysadmin"]
- elif privilege == UserPrivilege.ADMIN.value:
- newRoleLabels = ["admin"]
- elif privilege == UserPrivilege.USER.value:
- newRoleLabels = ["user"]
- else:
- # Default to user if privilege is unknown
- newRoleLabels = ["user"]
- logger.warning(f"Unknown privilege '{privilege}' for user {user.get('id')}, defaulting to 'user'")
-
- # Update user
- user["roleLabels"] = newRoleLabels
- db.recordModify(UserInDB, user["id"], user)
- updatedCount += 1
- logger.info(f"Updated user {user.get('id')} ({user.get('username')}): {privilege} -> {newRoleLabels}")
-
- results["usersUpdated"] = updatedCount
- logger.info(f"Updated {updatedCount} users with roleLabels")
- except Exception as e:
- logger.error(f"Error converting user privileges: {e}")
- results["errors"].append(f"Error converting user privileges: {e}")
- else:
- # Dry run: count users that would be updated
- users = db.getRecordset(UserInDB)
- wouldUpdate = 0
- for user in users:
- roleLabels = user.get("roleLabels", [])
- if not roleLabels or not isinstance(roleLabels, list) or len(roleLabels) == 0:
- wouldUpdate += 1
- results["usersUpdated"] = wouldUpdate
- logger.info(f"Would update {wouldUpdate} users with roleLabels")
-
- # Step 4: Create RBAC rules if they don't exist
- logger.info("Step 4: Creating RBAC rules")
- if not dryRun:
- try:
- existingRules = db.getRecordset(AccessRule)
- if existingRules:
- results["rulesCreated"] = len(existingRules)
- results["dataMigrations"].append(f"RBAC rules already exist ({len(existingRules)} rules)")
- logger.info(f"RBAC rules already exist ({len(existingRules)} rules)")
- else:
- # Initialize RBAC rules using bootstrap logic
- initRbacRules(db)
- newRules = db.getRecordset(AccessRule)
- results["rulesCreated"] = len(newRules)
- results["dataMigrations"].append(f"Created {len(newRules)} RBAC rules")
- logger.info(f"Created {len(newRules)} RBAC rules")
- except Exception as e:
- logger.error(f"Error creating RBAC rules: {e}")
- results["errors"].append(f"Error creating RBAC rules: {e}")
- else:
- existingRules = db.getRecordset(AccessRule)
- if existingRules:
- results["rulesCreated"] = len(existingRules)
- results["dataMigrations"].append(f"RBAC rules already exist ({len(existingRules)} rules)")
- else:
- results["dataMigrations"].append("Would create RBAC rules")
-
- logger.info("Migration completed successfully")
- return results
-
- except Exception as e:
- logger.error(f"Migration failed: {e}")
- results["errors"].append(f"Migration failed: {e}")
- return results
-
-
-def validateMigration(db: DatabaseConnector) -> Dict[str, Any]:
- """
- Validate that migration was successful.
-
- Args:
- db: Database connector instance
-
- Returns:
- Dictionary with validation results
- """
- validation = {
- "valid": True,
- "issues": []
- }
-
- try:
- # Check that AccessRule table exists
- try:
- rules = db.getRecordset(AccessRule)
- if not rules:
- validation["valid"] = False
- validation["issues"].append("AccessRule table exists but has no rules")
- except Exception as e:
- validation["valid"] = False
- validation["issues"].append(f"AccessRule table does not exist or is not accessible: {e}")
-
- # Check that all users have roleLabels
- users = db.getRecordset(UserInDB)
- usersWithoutRoles = []
- for user in users:
- roleLabels = user.get("roleLabels", [])
- if not roleLabels or not isinstance(roleLabels, list) or len(roleLabels) == 0:
- usersWithoutRoles.append({
- "id": user.get("id"),
- "username": user.get("username"),
- "privilege": user.get("privilege")
- })
-
- if usersWithoutRoles:
- validation["valid"] = False
- validation["issues"].append(f"{len(usersWithoutRoles)} users without roleLabels: {[u['username'] for u in usersWithoutRoles]}")
-
- return validation
-
- except Exception as e:
- validation["valid"] = False
- validation["issues"].append(f"Validation error: {e}")
- return validation
diff --git a/modules/routes/routeAdminAutomationEvents.py b/modules/routes/routeAdminAutomationEvents.py
index dcac4f27..8eaa0ca7 100644
--- a/modules/routes/routeAdminAutomationEvents.py
+++ b/modules/routes/routeAdminAutomationEvents.py
@@ -11,7 +11,7 @@ import logging
# Import interfaces and models
import modules.interfaces.interfaceDbChatObjects as interfaceDbChatObjects
from modules.security.auth import getCurrentUser, limiter
-from modules.datamodels.datamodelUam import User, UserPrivilege
+from modules.datamodels.datamodelUam import User
# Configure logger
logger = logging.getLogger(__name__)
@@ -30,11 +30,11 @@ router = APIRouter(
)
def requireSysadmin(currentUser: User):
- """Require sysadmin privilege"""
- if currentUser.privilege != UserPrivilege.SYSADMIN:
+ """Require sysadmin role"""
+ if "sysadmin" not in (currentUser.roleLabels or []):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
- detail="Sysadmin privilege required"
+ detail="Sysadmin role required"
)
@router.get("")
diff --git a/modules/routes/routeDataUsers.py b/modules/routes/routeDataUsers.py
index 2f219b5c..017acb17 100644
--- a/modules/routes/routeDataUsers.py
+++ b/modules/routes/routeDataUsers.py
@@ -14,7 +14,7 @@ import modules.interfaces.interfaceDbAppObjects as interfaceDbAppObjects
from modules.security.auth import getCurrentUser, limiter, getCurrentUser
# Import the attribute definition and helper functions
-from modules.datamodels.datamodelUam import User, UserPrivilege
+from modules.datamodels.datamodelUam import User
from modules.datamodels.datamodelPagination import PaginationParams, PaginatedResponse, PaginationMetadata
# Configure logger
@@ -141,7 +141,7 @@ async def create_user(
fullName=user_data.fullName,
language=user_data.language,
enabled=user_data.enabled,
- privilege=user_data.privilege,
+ roleLabels=user_data.roleLabels if user_data.roleLabels else ["user"],
authenticationAuthority=user_data.authenticationAuthority
)
@@ -188,7 +188,7 @@ async def reset_user_password(
"""Reset user password (Admin only)"""
try:
# Check if current user is admin
- if currentUser.privilege != UserPrivilege.ADMIN:
+ if "admin" not in (currentUser.roleLabels or []) and "sysadmin" not in (currentUser.roleLabels or []):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Only administrators can reset passwords"
diff --git a/modules/routes/routeSecurityAdmin.py b/modules/routes/routeSecurityAdmin.py
index c0513ac0..4899d03a 100644
--- a/modules/routes/routeSecurityAdmin.py
+++ b/modules/routes/routeSecurityAdmin.py
@@ -25,9 +25,10 @@ router = APIRouter(
)
def _ensure_admin_scope(current_user: User, target_mandate_id: Optional[str] = None) -> None:
- if current_user.privilege not in ("admin", "sysadmin"):
+ roleLabels = current_user.roleLabels or []
+ if "admin" not in roleLabels and "sysadmin" not in roleLabels:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin privileges required")
- if current_user.privilege == "admin":
+ if "admin" in roleLabels and "sysadmin" not in roleLabels:
if target_mandate_id and str(target_mandate_id) != str(current_user.mandateId):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Forbidden for target mandate")
@@ -63,7 +64,8 @@ async def list_tokens(
recordFilter["connectionId"] = connectionId
if statusFilter:
recordFilter["status"] = statusFilter
- if currentUser.privilege == "admin":
+ roleLabels = currentUser.roleLabels or []
+ if "admin" in roleLabels and "sysadmin" not in roleLabels:
recordFilter["mandateId"] = str(currentUser.mandateId)
tokens = appInterface.db.getRecordset(Token, recordFilter=recordFilter)
@@ -95,10 +97,11 @@ async def revoke_tokens_by_user(
target_mandate = target_user[0].get("mandateId") if target_user else None
_ensure_admin_scope(currentUser, target_mandate)
+ roleLabels = currentUser.roleLabels or []
count = appInterface.revokeTokensByUser(
userId=userId,
authority=AuthAuthority(authority) if authority else None,
- mandateId=None if currentUser.privilege == "sysadmin" else str(currentUser.mandateId),
+ mandateId=None if "sysadmin" in roleLabels else str(currentUser.mandateId),
revokedBy=currentUser.id,
reason=reason
)
diff --git a/modules/routes/routeSecurityLocal.py b/modules/routes/routeSecurityLocal.py
index 7b08ceed..858cf3c6 100644
--- a/modules/routes/routeSecurityLocal.py
+++ b/modules/routes/routeSecurityLocal.py
@@ -15,7 +15,7 @@ from jose import jwt
from modules.security.auth import getCurrentUser, limiter, SECRET_KEY, ALGORITHM
from modules.security.jwtService import createAccessToken, createRefreshToken, setAccessTokenCookie, setRefreshTokenCookie, clearAccessTokenCookie, clearRefreshTokenCookie
from modules.interfaces.interfaceDbAppObjects import getInterface, getRootInterface
-from modules.datamodels.datamodelUam import User, UserInDB, AuthAuthority, UserPrivilege
+from modules.datamodels.datamodelUam import User, UserInDB, AuthAuthority
from modules.datamodels.datamodelSecurity import Token
# Configure logger
@@ -212,9 +212,8 @@ async def register_user(
appInterface.mandateId = defaultMandateId
# Create user with local authentication
- # Set safe default privilege level for new registrations
+ # Set safe default role for new registrations
# New users are disabled by default and require admin approval
- from modules.datamodels.datamodelUam import UserPrivilege
user = appInterface.createUser(
username=userData.username,
password=password,
@@ -222,7 +221,7 @@ async def register_user(
fullName=userData.fullName,
language=userData.language,
enabled=False, # New users are disabled by default
- privilege=UserPrivilege.USER, # Always set to USER for new registrations
+ roleLabels=["user"], # Default role for new registrations
authenticationAuthority=AuthAuthority.LOCAL
)
diff --git a/modules/routes/routeWorkflows.py b/modules/routes/routeWorkflows.py
index 080e8077..6ab0598a 100644
--- a/modules/routes/routeWorkflows.py
+++ b/modules/routes/routeWorkflows.py
@@ -427,8 +427,12 @@ async def delete_workflow(
# Get service center
interfaceDbChat = getServiceChat(currentUser)
- # Get raw workflow data from database to check permissions
- workflows = interfaceDbChat.db.getRecordset(ChatWorkflow, recordFilter={"id": workflowId})
+ # Check workflow access and permission using RBAC
+ workflows = interfaceDbChat.db.getRecordsetWithRBAC(
+ ChatWorkflow,
+ currentUser,
+ recordFilter={"id": workflowId}
+ )
if not workflows:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
diff --git a/tests/integration/rbac/test_rbac_migration.py b/tests/integration/rbac/test_rbac_migration.py
deleted file mode 100644
index 86f3eb6d..00000000
--- a/tests/integration/rbac/test_rbac_migration.py
+++ /dev/null
@@ -1,282 +0,0 @@
-"""
-Integration tests for UAM to RBAC migration.
-Tests that migration correctly converts user privileges to roleLabels.
-Uses real database connection for integration testing.
-"""
-
-import pytest
-from modules.migration.migrateUamToRbac import migrateUamToRbac, validateMigration
-from modules.datamodels.datamodelUam import UserInDB, UserPrivilege
-from modules.connectors.connectorDbPostgre import DatabaseConnector
-from modules.shared.configuration import APP_CONFIG
-
-
-@pytest.fixture(scope="class")
-def db():
- """Create real database connector for integration tests."""
- dbHost = APP_CONFIG.get("DB_HOST", "localhost")
- dbDatabase = APP_CONFIG.get("DB_DATABASE", "poweron_test")
- dbUser = APP_CONFIG.get("DB_USER", "postgres")
- dbPassword = APP_CONFIG.get("DB_PASSWORD", "")
- dbPort = APP_CONFIG.get("DB_PORT", 5432)
-
- db = DatabaseConnector(
- dbHost=dbHost,
- dbDatabase=dbDatabase,
- dbUser=dbUser,
- dbPassword=dbPassword,
- dbPort=dbPort
- )
- yield db
- db.close()
-
-
-class TestRbacMigration:
- """Test RBAC migration from UAM."""
-
- def testMigrateUserPrivilegeToRoleLabels(self, db):
- """Test that user privileges are correctly converted to roleLabels."""
- # Create test users with privileges but no roleLabels
- testUsers = [
- UserInDB(
- id="migrate_test_user1",
- username="migrate_admin",
- privilege=UserPrivilege.SYSADMIN.value
- ),
- UserInDB(
- id="migrate_test_user2",
- username="migrate_admin2",
- privilege=UserPrivilege.ADMIN.value
- ),
- UserInDB(
- id="migrate_test_user3",
- username="migrate_user1",
- privilege=UserPrivilege.USER.value
- )
- ]
-
- try:
- # Create test users in database
- for user in testUsers:
- userData = user.model_dump()
- # Ensure roleLabels is None/empty for migration test
- userData["roleLabels"] = []
- userData["id"] = user.id
- db.recordCreate(UserInDB, userData)
-
- # Run migration
- results = migrateUamToRbac(db, dryRun=False)
-
- # Check that users were updated
- assert results["usersUpdated"] == 3
-
- # Verify users were actually updated in database
- users1 = db.getRecordset(UserInDB, recordFilter={"id": "migrate_test_user1"})
- users2 = db.getRecordset(UserInDB, recordFilter={"id": "migrate_test_user2"})
- users3 = db.getRecordset(UserInDB, recordFilter={"id": "migrate_test_user3"})
- user1 = users1[0] if users1 else None
- user2 = users2[0] if users2 else None
- user3 = users3[0] if users3 else None
-
- assert user1 is not None
- assert "sysadmin" in user1.get("roleLabels", [])
-
- assert user2 is not None
- assert "admin" in user2.get("roleLabels", [])
-
- assert user3 is not None
- assert "user" in user3.get("roleLabels", [])
- finally:
- # Cleanup test users
- for user in testUsers:
- try:
- db.recordDelete(UserInDB, user.id)
- except:
- pass
-
- def testMigrationSkipsUsersWithExistingRoleLabels(self, db):
- """Test that migration skips users who already have roleLabels."""
- # Create test users: one with roleLabels, one without
- user1 = UserInDB(
- id="skip_test_user1",
- username="skip_admin",
- privilege=UserPrivilege.SYSADMIN.value,
- roleLabels=["sysadmin"] # Already migrated
- )
- user2 = UserInDB(
- id="skip_test_user2",
- username="skip_user1",
- privilege=UserPrivilege.USER.value,
- roleLabels=[] # Needs migration
- )
-
- try:
- # Create test users in database
- user1Data = user1.model_dump()
- user1Data["id"] = user1.id
- user2Data = user2.model_dump()
- user2Data["id"] = user2.id
- db.recordCreate(UserInDB, user1Data)
- db.recordCreate(UserInDB, user2Data)
-
- # Run migration
- results = migrateUamToRbac(db, dryRun=False)
-
- # Only one user should be updated (user2)
- assert results["usersUpdated"] == 1
-
- # Verify user1 still has original roleLabels
- users1 = db.getRecordset(UserInDB, recordFilter={"id": "skip_test_user1"})
- updatedUser1 = users1[0] if users1 else None
- assert updatedUser1 is not None
- assert "sysadmin" in updatedUser1.get("roleLabels", [])
-
- # Verify user2 was updated
- users2 = db.getRecordset(UserInDB, recordFilter={"id": "skip_test_user2"})
- updatedUser2 = users2[0] if users2 else None
- assert updatedUser2 is not None
- assert "user" in updatedUser2.get("roleLabels", [])
- finally:
- # Cleanup test users
- try:
- db.recordDelete(UserInDB, "skip_test_user1")
- db.recordDelete(UserInDB, "skip_test_user2")
- except:
- pass
-
- def testDryRunMode(self, db):
- """Test that dry run mode doesn't make changes."""
- # Create test user without roleLabels
- testUser = UserInDB(
- id="dryrun_test_user1",
- username="dryrun_admin",
- privilege=UserPrivilege.SYSADMIN.value,
- roleLabels=[] # Needs migration
- )
-
- try:
- # Create test user in database
- userData = testUser.model_dump()
- userData["id"] = testUser.id
- db.recordCreate(UserInDB, userData)
-
- # Get original state
- originalUsers = db.getRecordset(UserInDB, recordFilter={"id": "dryrun_test_user1"})
- originalUser = originalUsers[0] if originalUsers else None
- assert originalUser is not None
- originalRoleLabels = originalUser.get("roleLabels", [])
-
- # Run migration in dry run mode
- results = migrateUamToRbac(db, dryRun=True)
-
- # Should report what would be done
- assert results["usersUpdated"] == 1
-
- # Verify user was NOT actually updated
- unchangedUsers = db.getRecordset(UserInDB, recordFilter={"id": "dryrun_test_user1"})
- unchangedUser = unchangedUsers[0] if unchangedUsers else None
- assert unchangedUser is not None
- assert unchangedUser.get("roleLabels", []) == originalRoleLabels
- finally:
- # Cleanup test user
- try:
- db.recordDelete(UserInDB, "dryrun_test_user1")
- except:
- pass
-
- def testValidateMigrationSuccess(self, db):
- """Test validation passes when migration is successful."""
- # Create test users with roleLabels (already migrated)
- testUsers = [
- UserInDB(
- id="validate_test_user1",
- username="validate_admin",
- privilege=UserPrivilege.SYSADMIN.value,
- roleLabels=["sysadmin"]
- ),
- UserInDB(
- id="validate_test_user2",
- username="validate_admin2",
- privilege=UserPrivilege.ADMIN.value,
- roleLabels=["admin"]
- )
- ]
-
- try:
- # Create test users in database
- for user in testUsers:
- userData = user.model_dump()
- userData["id"] = user.id
- db.recordCreate(UserInDB, userData)
-
- # Ensure AccessRule table exists (migration should have created it)
- from modules.datamodels.datamodelRbac import AccessRule
- db._ensureTableExists(AccessRule)
-
- # Run validation
- validation = validateMigration(db)
-
- assert validation["valid"] == True
- assert len(validation["issues"]) == 0
- finally:
- # Cleanup test users
- for user in testUsers:
- try:
- db.recordDelete(UserInDB, user.id)
- except:
- pass
-
- def testValidateMigrationFailsWithoutRoleLabels(self, db):
- """Test validation fails when users don't have roleLabels."""
- # Create test users: one with roleLabels, one without, one with empty roleLabels
- testUsers = [
- UserInDB(
- id="validate_fail_user1",
- username="validate_fail_admin",
- privilege=UserPrivilege.SYSADMIN.value,
- roleLabels=["sysadmin"] # Has roleLabels
- ),
- UserInDB(
- id="validate_fail_user2",
- username="validate_fail_user",
- privilege=UserPrivilege.USER.value,
- roleLabels=[] # Empty roleLabels
- ),
- UserInDB(
- id="validate_fail_user3",
- username="validate_fail_user2",
- privilege=UserPrivilege.USER.value
- # Missing roleLabels field (will be None)
- )
- ]
-
- try:
- # Create test users in database
- for user in testUsers:
- userData = user.model_dump()
- userData["id"] = user.id
- # For user3, explicitly set roleLabels to None or remove it
- if user.id == "validate_fail_user3":
- if "roleLabels" in userData:
- del userData["roleLabels"]
- db.recordCreate(UserInDB, userData)
-
- # Ensure AccessRule table exists
- from modules.datamodels.datamodelRbac import AccessRule
- db._ensureTableExists(AccessRule)
-
- # Run validation
- validation = validateMigration(db)
-
- assert validation["valid"] == False
- assert len(validation["issues"]) > 0
- # Check that validation found users without roleLabels
- issuesStr = " ".join(validation["issues"])
- assert "users without roleLabels" in issuesStr or "without roleLabels" in issuesStr
- finally:
- # Cleanup test users
- for user in testUsers:
- try:
- db.recordDelete(UserInDB, user.id)
- except:
- pass
diff --git a/tests/unit/rbac/test_rbac_bootstrap.py b/tests/unit/rbac/test_rbac_bootstrap.py
index 573a4fd1..e12592a1 100644
--- a/tests/unit/rbac/test_rbac_bootstrap.py
+++ b/tests/unit/rbac/test_rbac_bootstrap.py
@@ -14,7 +14,7 @@ from modules.interfaces.interfaceBootstrap import (
createDefaultRoleRules,
createTableSpecificRules
)
-from modules.datamodels.datamodelUam import UserInDB, Mandate, UserPrivilege, AuthAuthority
+from modules.datamodels.datamodelUam import UserInDB, Mandate, AuthAuthority
from modules.datamodels.datamodelRbac import AccessRule, AccessRuleContext
from modules.datamodels.datamodelUam import AccessLevel
@@ -62,7 +62,6 @@ class TestRbacBootstrap:
assert isinstance(user, UserInDB)
assert user.username == "admin"
assert "sysadmin" in user.roleLabels
- assert user.privilege == UserPrivilege.SYSADMIN
def testInitEventUserCreatesWithSysadminRole(self):
"""Test that initEventUser creates user with sysadmin role."""