gateway/modules/security/rbac.py
2025-12-07 23:51:05 +01:00

212 lines
8.1 KiB
Python

"""
RBAC interface: Core RBAC logic and permission resolution.
Moved from interfaces to security module to maintain proper architectural layering.
Connectors can import from security, but not from interfaces.
"""
import logging
from typing import List, Optional, Dict, Any, TYPE_CHECKING
from modules.datamodels.datamodelRbac import AccessRule, AccessRuleContext
from modules.datamodels.datamodelUam import User, UserPermissions, AccessLevel
if TYPE_CHECKING:
from modules.connectors.connectorDbPostgre import DatabaseConnector
logger = logging.getLogger(__name__)
class RbacClass:
"""
RBAC interface for permission resolution and rule validation.
"""
def __init__(self, db: "DatabaseConnector", dbApp: "DatabaseConnector"):
"""
Initialize RBAC interface with database connector.
Args:
db: Database connector for general operations (may be from any database)
dbApp: DbApp database connector for AccessRule queries.
AccessRule table is always in the DbApp database.
"""
self.db = db
self.dbApp = dbApp
def getUserPermissions(self, user: User, context: AccessRuleContext, item: str) -> UserPermissions:
"""
Get combined permissions for a user across all their roles.
Args:
user: User object with roleLabels
context: Access rule context (DATA, UI, RESOURCE)
item: Item identifier (table name, UI path, resource path)
Returns:
UserPermissions object with combined permissions
"""
permissions = UserPermissions(
view=False,
read=AccessLevel.NONE,
create=AccessLevel.NONE,
update=AccessLevel.NONE,
delete=AccessLevel.NONE
)
if not hasattr(user, 'roleLabels') or not user.roleLabels:
return permissions
# Step 1: For each role, find the most specific matching rule (most specific wins within role)
rolePermissions = {}
for roleLabel in user.roleLabels:
# Get all rules for this role and context
allRules = self._getRulesForRole(roleLabel, context)
# Find most specific rule for this item (longest matching prefix)
mostSpecificRule = self.findMostSpecificRule(allRules, item)
if mostSpecificRule:
rolePermissions[roleLabel] = mostSpecificRule
# Step 2: Combine permissions across roles using opening (union) logic
for roleLabel, rule in rolePermissions.items():
# View: union logic - if ANY role has view=true, then view=true
if rule.view:
permissions.view = True
if context == AccessRuleContext.DATA:
# For DATA context, use most permissive access level across roles
if rule.read and self._isMorePermissive(rule.read, permissions.read):
permissions.read = rule.read
if rule.create and self._isMorePermissive(rule.create, permissions.create):
permissions.create = rule.create
if rule.update and self._isMorePermissive(rule.update, permissions.update):
permissions.update = rule.update
if rule.delete and self._isMorePermissive(rule.delete, permissions.delete):
permissions.delete = rule.delete
return permissions
def findMostSpecificRule(self, rules: List[AccessRule], item: str) -> Optional[AccessRule]:
"""
Find the most specific rule for an item (longest matching prefix wins).
Args:
rules: List of access rules to search
item: Item identifier to match
Returns:
Most specific matching rule, or None if no match
"""
if not item:
# If no item specified, return generic rule (item = null)
genericRules = [r for r in rules if r.item is None]
return genericRules[0] if genericRules else None
# Find longest matching prefix
itemParts = item.split(".")
bestMatch = None
bestMatchLength = -1
for rule in rules:
if rule.item is None:
# Generic rule - use as fallback if no specific match found
if bestMatch is None:
bestMatch = rule
elif rule.item == item:
# Exact match - most specific
return rule
elif item.startswith(rule.item + "."):
# Prefix match - check if it's longer than current best
matchLength = len(rule.item.split("."))
if matchLength > bestMatchLength:
bestMatch = rule
bestMatchLength = matchLength
return bestMatch
def validateAccessRule(self, rule: AccessRule) -> bool:
"""
Validate that CUD permissions are allowed by read permission level (only for DATA context).
Args:
rule: AccessRule to validate
Returns:
True if rule is valid, False otherwise
"""
if rule.context != AccessRuleContext.DATA:
# For UI and RESOURCE contexts, only view is relevant
return True
if rule.read is None:
return False # DATA context requires read permission
readLevel = AccessLevel(rule.read)
# CUD operations are only allowed if read permission exists
for operation in [rule.create, rule.update, rule.delete]:
if operation is None or operation == AccessLevel.NONE.value:
continue # No access is always valid
if readLevel == AccessLevel.NONE:
return False # No CUD allowed if no read access
if readLevel == AccessLevel.MY and operation not in [AccessLevel.NONE.value, AccessLevel.MY.value]:
return False
if readLevel == AccessLevel.GROUP and operation not in [AccessLevel.NONE.value, AccessLevel.MY.value, AccessLevel.GROUP.value]:
return False
return True
def _isMorePermissive(self, level1: AccessLevel, level2: AccessLevel) -> bool:
"""
Check if level1 is more permissive than level2.
Args:
level1: First access level
level2: Second access level
Returns:
True if level1 is more permissive than level2
"""
hierarchy = {
AccessLevel.NONE: 0,
AccessLevel.MY: 1,
AccessLevel.GROUP: 2,
AccessLevel.ALL: 3
}
return hierarchy.get(level1, 0) > hierarchy.get(level2, 0)
def _getRulesForRole(self, roleLabel: str, context: AccessRuleContext) -> List[AccessRule]:
"""
Get all access rules for a specific role and context.
Always queries from DbApp database, not the current database.
Args:
roleLabel: Role label to get rules for
context: Context type
Returns:
List of AccessRule objects
"""
try:
# Always use DbApp database for AccessRule queries
rules = self.dbApp.getRecordset(
AccessRule,
recordFilter={
"roleLabel": roleLabel,
"context": context.value
}
)
# Convert dict records to AccessRule objects
accessRules = []
for record in rules:
try:
accessRule = AccessRule(**record)
accessRules.append(accessRule)
except Exception as e:
logger.error(f"Error converting rule record to AccessRule: {e}, record={record}")
return accessRules
except Exception as e:
logger.error(f"Error getting rules for role {roleLabel} and context {context.value}: {e}", exc_info=True)
return []