214 lines
8.1 KiB
Python
214 lines
8.1 KiB
Python
# Copyright (c) 2025 Patrick Motsch
|
|
# All rights reserved.
|
|
"""
|
|
RBAC interface: Core RBAC logic and permission resolution.
|
|
Moved from interfaces to security module to maintain proper architectural layering.
|
|
Connectors can import from security, but not from interfaces.
|
|
"""
|
|
|
|
import logging
|
|
from typing import List, Optional, Dict, Any, TYPE_CHECKING
|
|
from modules.datamodels.datamodelRbac import AccessRule, AccessRuleContext
|
|
from modules.datamodels.datamodelUam import User, UserPermissions, AccessLevel
|
|
|
|
if TYPE_CHECKING:
|
|
from modules.connectors.connectorDbPostgre import DatabaseConnector
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class RbacClass:
|
|
"""
|
|
RBAC interface for permission resolution and rule validation.
|
|
"""
|
|
|
|
def __init__(self, db: "DatabaseConnector", dbApp: "DatabaseConnector"):
|
|
"""
|
|
Initialize RBAC interface with database connector.
|
|
|
|
Args:
|
|
db: Database connector for general operations (may be from any database)
|
|
dbApp: DbApp database connector for AccessRule queries.
|
|
AccessRule table is always in the DbApp database.
|
|
"""
|
|
self.db = db
|
|
self.dbApp = dbApp
|
|
|
|
def getUserPermissions(self, user: User, context: AccessRuleContext, item: str) -> UserPermissions:
|
|
"""
|
|
Get combined permissions for a user across all their roles.
|
|
|
|
Args:
|
|
user: User object with roleLabels
|
|
context: Access rule context (DATA, UI, RESOURCE)
|
|
item: Item identifier (table name, UI path, resource path)
|
|
|
|
Returns:
|
|
UserPermissions object with combined permissions
|
|
"""
|
|
permissions = UserPermissions(
|
|
view=False,
|
|
read=AccessLevel.NONE,
|
|
create=AccessLevel.NONE,
|
|
update=AccessLevel.NONE,
|
|
delete=AccessLevel.NONE
|
|
)
|
|
|
|
if not hasattr(user, 'roleLabels') or not user.roleLabels:
|
|
return permissions
|
|
|
|
# Step 1: For each role, find the most specific matching rule (most specific wins within role)
|
|
rolePermissions = {}
|
|
for roleLabel in user.roleLabels:
|
|
# Get all rules for this role and context
|
|
allRules = self._getRulesForRole(roleLabel, context)
|
|
|
|
# Find most specific rule for this item (longest matching prefix)
|
|
mostSpecificRule = self.findMostSpecificRule(allRules, item)
|
|
|
|
if mostSpecificRule:
|
|
rolePermissions[roleLabel] = mostSpecificRule
|
|
|
|
# Step 2: Combine permissions across roles using opening (union) logic
|
|
for roleLabel, rule in rolePermissions.items():
|
|
# View: union logic - if ANY role has view=true, then view=true
|
|
if rule.view:
|
|
permissions.view = True
|
|
|
|
if context == AccessRuleContext.DATA:
|
|
# For DATA context, use most permissive access level across roles
|
|
if rule.read and self._isMorePermissive(rule.read, permissions.read):
|
|
permissions.read = rule.read
|
|
if rule.create and self._isMorePermissive(rule.create, permissions.create):
|
|
permissions.create = rule.create
|
|
if rule.update and self._isMorePermissive(rule.update, permissions.update):
|
|
permissions.update = rule.update
|
|
if rule.delete and self._isMorePermissive(rule.delete, permissions.delete):
|
|
permissions.delete = rule.delete
|
|
|
|
return permissions
|
|
|
|
def findMostSpecificRule(self, rules: List[AccessRule], item: str) -> Optional[AccessRule]:
|
|
"""
|
|
Find the most specific rule for an item (longest matching prefix wins).
|
|
|
|
Args:
|
|
rules: List of access rules to search
|
|
item: Item identifier to match
|
|
|
|
Returns:
|
|
Most specific matching rule, or None if no match
|
|
"""
|
|
if not item:
|
|
# If no item specified, return generic rule (item = null)
|
|
genericRules = [r for r in rules if r.item is None]
|
|
return genericRules[0] if genericRules else None
|
|
|
|
# Find longest matching prefix
|
|
itemParts = item.split(".")
|
|
bestMatch = None
|
|
bestMatchLength = -1
|
|
|
|
for rule in rules:
|
|
if rule.item is None:
|
|
# Generic rule - use as fallback if no specific match found
|
|
if bestMatch is None:
|
|
bestMatch = rule
|
|
elif rule.item == item:
|
|
# Exact match - most specific
|
|
return rule
|
|
elif item.startswith(rule.item + "."):
|
|
# Prefix match - check if it's longer than current best
|
|
matchLength = len(rule.item.split("."))
|
|
if matchLength > bestMatchLength:
|
|
bestMatch = rule
|
|
bestMatchLength = matchLength
|
|
|
|
return bestMatch
|
|
|
|
def validateAccessRule(self, rule: AccessRule) -> bool:
|
|
"""
|
|
Validate that CUD permissions are allowed by read permission level (only for DATA context).
|
|
|
|
Args:
|
|
rule: AccessRule to validate
|
|
|
|
Returns:
|
|
True if rule is valid, False otherwise
|
|
"""
|
|
if rule.context != AccessRuleContext.DATA:
|
|
# For UI and RESOURCE contexts, only view is relevant
|
|
return True
|
|
|
|
if rule.read is None:
|
|
return False # DATA context requires read permission
|
|
|
|
readLevel = AccessLevel(rule.read)
|
|
|
|
# CUD operations are only allowed if read permission exists
|
|
for operation in [rule.create, rule.update, rule.delete]:
|
|
if operation is None or operation == AccessLevel.NONE.value:
|
|
continue # No access is always valid
|
|
if readLevel == AccessLevel.NONE:
|
|
return False # No CUD allowed if no read access
|
|
if readLevel == AccessLevel.MY and operation not in [AccessLevel.NONE.value, AccessLevel.MY.value]:
|
|
return False
|
|
if readLevel == AccessLevel.GROUP and operation not in [AccessLevel.NONE.value, AccessLevel.MY.value, AccessLevel.GROUP.value]:
|
|
return False
|
|
|
|
return True
|
|
|
|
def _isMorePermissive(self, level1: AccessLevel, level2: AccessLevel) -> bool:
|
|
"""
|
|
Check if level1 is more permissive than level2.
|
|
|
|
Args:
|
|
level1: First access level
|
|
level2: Second access level
|
|
|
|
Returns:
|
|
True if level1 is more permissive than level2
|
|
"""
|
|
hierarchy = {
|
|
AccessLevel.NONE: 0,
|
|
AccessLevel.MY: 1,
|
|
AccessLevel.GROUP: 2,
|
|
AccessLevel.ALL: 3
|
|
}
|
|
return hierarchy.get(level1, 0) > hierarchy.get(level2, 0)
|
|
|
|
def _getRulesForRole(self, roleLabel: str, context: AccessRuleContext) -> List[AccessRule]:
|
|
"""
|
|
Get all access rules for a specific role and context.
|
|
Always queries from DbApp database, not the current database.
|
|
|
|
Args:
|
|
roleLabel: Role label to get rules for
|
|
context: Context type
|
|
|
|
Returns:
|
|
List of AccessRule objects
|
|
"""
|
|
try:
|
|
# Always use DbApp database for AccessRule queries
|
|
rules = self.dbApp.getRecordset(
|
|
AccessRule,
|
|
recordFilter={
|
|
"roleLabel": roleLabel,
|
|
"context": context.value
|
|
}
|
|
)
|
|
|
|
# Convert dict records to AccessRule objects
|
|
accessRules = []
|
|
for record in rules:
|
|
try:
|
|
accessRule = AccessRule(**record)
|
|
accessRules.append(accessRule)
|
|
except Exception as e:
|
|
logger.error(f"Error converting rule record to AccessRule: {e}, record={record}")
|
|
|
|
return accessRules
|
|
except Exception as e:
|
|
logger.error(f"Error getting rules for role {roleLabel} and context {context.value}: {e}", exc_info=True)
|
|
return []
|