gateway/modules/security/rbac.py
2026-02-09 23:44:52 +01:00

544 lines
21 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
RBAC interface: Core RBAC logic and permission resolution.
Multi-Tenant Design:
- AccessRules referenzieren roleId (FK), nicht roleLabel
- Rollen werden über UserMandate + UserMandateRole geladen
- Priorisierung: Instance > Mandate > Global
- Stateless Design: Kein Cache, direkt aus DB
"""
import logging
from typing import List, Optional, TYPE_CHECKING
from modules.datamodels.datamodelRbac import AccessRule, AccessRuleContext, Role
from modules.datamodels.datamodelUam import User, UserPermissions, AccessLevel
from modules.datamodels.datamodelMembership import (
UserMandate,
UserMandateRole,
FeatureAccess,
FeatureAccessRole
)
if TYPE_CHECKING:
from modules.connectors.connectorDbPostgre import DatabaseConnector
logger = logging.getLogger(__name__)
class RbacClass:
"""
RBAC interface for permission resolution and rule validation.
Multi-Tenant Design:
- Lädt Rollen über UserMandate + UserMandateRole
- AccessRules werden über roleId gefunden
- isSysAdmin für System-Level Operationen (ohne Mandant)
"""
def __init__(self, db: "DatabaseConnector", dbApp: "DatabaseConnector"):
"""
Initialize RBAC interface with database connector.
Args:
db: Database connector for general operations (may be from any database)
dbApp: DbApp database connector for AccessRule queries.
AccessRule table is always in the DbApp database.
"""
self.db = db
self.dbApp = dbApp
def getUserPermissions(
self,
user: User,
context: AccessRuleContext,
item: str,
mandateId: Optional[str] = None,
featureInstanceId: Optional[str] = None
) -> UserPermissions:
"""
Get combined permissions for a user across all their roles.
Multi-Tenant Design:
- Lädt Rollen aus UserMandate + UserMandateRole wenn mandateId gegeben
- isSysAdmin gibt vollen Zugriff, unabhängig vom Kontext
Args:
user: User object
context: Access rule context (DATA, UI, RESOURCE)
item: Item identifier (table name, UI path, resource path)
mandateId: Optional mandate context for role lookup
featureInstanceId: Optional feature instance context
Returns:
UserPermissions object with combined permissions
"""
permissions = UserPermissions(
view=False,
read=AccessLevel.NONE,
create=AccessLevel.NONE,
update=AccessLevel.NONE,
delete=AccessLevel.NONE
)
# SysAdmin hat vollen Zugriff - unabhängig vom Kontext (Mandant/Feature)
if hasattr(user, 'isSysAdmin') and user.isSysAdmin:
return UserPermissions(
view=True,
read=AccessLevel.ALL,
create=AccessLevel.ALL,
update=AccessLevel.ALL,
delete=AccessLevel.ALL
)
# Lade Role-IDs für den User via UserMandate + UserMandateRole
roleIds = self._getRoleIdsForUser(user, mandateId, featureInstanceId)
if not roleIds:
logger.debug(f"getUserPermissions: NO roles found for user={user.id}, mandateId={mandateId}, featureInstanceId={featureInstanceId}, item={item}")
return permissions
# Lade alle relevanten Regeln für alle Rollen
allRulesWithPriority = self._getRulesForRoleIds(roleIds, context, mandateId, featureInstanceId)
# Für jede Rolle die spezifischste Regel finden
# Priority order: 1) Role priority (instance > mandate > global), 2) Item specificity (exact > prefix > generic)
rolePermissions = {}
for priority, rule in allRulesWithPriority:
# Find most specific rule for this item
if self._ruleMatchesItem(rule, item):
roleId = rule.roleId
# Calculate item specificity: 0=generic (null), 1=prefix match, 2=exact match
itemSpecificity = self._getItemSpecificity(rule, item)
# Compare: first by role priority, then by item specificity
if roleId not in rolePermissions:
rolePermissions[roleId] = (priority, itemSpecificity, rule)
else:
existingPriority, existingSpecificity, _ = rolePermissions[roleId]
# Higher role priority wins, or same priority but higher specificity
if priority > existingPriority or (priority == existingPriority and itemSpecificity > existingSpecificity):
rolePermissions[roleId] = (priority, itemSpecificity, rule)
# Find highest priority among matching rules (role priority only, specificity already handled per role)
highestPriority = max((p for p, _, _ in rolePermissions.values()), default=0)
# Combine permissions ONLY from rules with highest priority
# This ensures instance-specific rules (Priority 3) override global rules (Priority 1)
for roleId, (priority, specificity, rule) in rolePermissions.items():
# Only use rules with highest priority
if priority < highestPriority:
continue
# View: union logic - if ANY role has view=true, then view=true
if rule.view:
permissions.view = True
if context == AccessRuleContext.DATA:
# For DATA context, use most permissive access level across roles at same priority
if rule.read and self._isMorePermissive(rule.read, permissions.read):
permissions.read = rule.read
if rule.create and self._isMorePermissive(rule.create, permissions.create):
permissions.create = rule.create
if rule.update and self._isMorePermissive(rule.update, permissions.update):
permissions.update = rule.update
if rule.delete and self._isMorePermissive(rule.delete, permissions.delete):
permissions.delete = rule.delete
return permissions
def _getRoleIdsForUser(
self,
user: User,
mandateId: Optional[str],
featureInstanceId: Optional[str]
) -> List[str]:
"""
Get all role IDs for a user in the given context.
Uses UserMandate + UserMandateRole for the multi-tenant model.
Each mandate has its own instances of system roles (admin, user, viewer)
which are copied from the global templates during mandate creation.
Therefore, only the requested mandate's roles are loaded - no need to
load root mandate roles separately.
Loads roles from:
1. The requested mandate (if provided) - includes mandate-instance system roles
2. Feature instance roles (if featureInstanceId provided)
Args:
user: User object
mandateId: Mandate context
featureInstanceId: Feature instance context
Returns:
List of role IDs
"""
roleIds = set() # Use set to avoid duplicates
try:
# Load roles from the requested mandate
if mandateId:
userMandateRecords = self.dbApp.getRecordset(
UserMandate,
recordFilter={"userId": user.id, "mandateId": mandateId, "enabled": True}
)
if userMandateRecords:
userMandateId = userMandateRecords[0]["id"]
# Lade UserMandateRoles (Mandate-level roles)
userMandateRoleRecords = self.dbApp.getRecordset(
UserMandateRole,
recordFilter={"userMandateId": userMandateId}
)
foundRoles = [r["roleId"] for r in userMandateRoleRecords if r.get("roleId")]
roleIds.update(foundRoles)
# Load FeatureAccess + FeatureAccessRole (Instance-level roles)
if featureInstanceId:
featureAccessRecords = self.dbApp.getRecordset(
FeatureAccess,
recordFilter={
"userId": user.id,
"featureInstanceId": featureInstanceId,
"enabled": True
}
)
if featureAccessRecords:
featureAccessId = featureAccessRecords[0]["id"]
featureAccessRoleRecords = self.dbApp.getRecordset(
FeatureAccessRole,
recordFilter={"featureAccessId": featureAccessId}
)
roleIds.update([r["roleId"] for r in featureAccessRoleRecords if r.get("roleId")])
except Exception as e:
logger.error(f"Error loading role IDs for user {user.id}: {e}")
return list(roleIds)
def getRulesForUserBulk(
self,
userId: str,
mandateId: str,
featureInstanceId: Optional[str] = None
) -> List[tuple]:
"""
Lädt alle relevanten Regeln für einen User in EINEM Query.
Stateless: Kein Cache, direkt aus DB.
Optimiert für Multi-Tenant mit Junction Tables:
- Mandant-Rollen via UserMandate → UserMandateRole
- Instanz-Rollen via FeatureAccess → FeatureAccessRole
Args:
userId: User ID
mandateId: Mandate context
featureInstanceId: Optional feature instance context
Returns:
Liste von (priority, AccessRule) Tupeln
"""
if not mandateId:
return []
try:
conn = self.dbApp.connection
roleIds = set()
# 1. Mandant-Rollen via UserMandate → UserMandateRole (SINGLE Query)
with conn.cursor() as cursor:
cursor.execute(
"""
SELECT umr."roleId"
FROM "UserMandate" um
JOIN "UserMandateRole" umr ON umr."userMandateId" = um.id
WHERE um."userId" = %s AND um."mandateId" = %s AND um."enabled" = true
""",
(userId, mandateId)
)
mandateRoles = cursor.fetchall()
roleIds.update(r["roleId"] for r in mandateRoles if r.get("roleId"))
# 2. Instanz-Rollen via FeatureAccess → FeatureAccessRole (SINGLE Query)
if featureInstanceId:
with conn.cursor() as cursor:
cursor.execute(
"""
SELECT far."roleId"
FROM "FeatureAccess" fa
JOIN "FeatureAccessRole" far ON far."featureAccessId" = fa.id
WHERE fa."userId" = %s AND fa."featureInstanceId" = %s AND fa."enabled" = true
""",
(userId, featureInstanceId)
)
instanceRoles = cursor.fetchall()
roleIds.update(r["roleId"] for r in instanceRoles if r.get("roleId"))
if not roleIds:
return []
# 3. BULK Query: Alle Regeln für alle Rollen + zugehörige Role-Daten
# SINGLE Query mit JOIN statt N+1
roleIdsList = list(roleIds)
with conn.cursor() as cursor:
cursor.execute(
"""
SELECT ar.*, r."mandateId" as "roleMandateId",
r."featureInstanceId" as "roleInstanceId"
FROM "AccessRule" ar
JOIN "Role" r ON ar."roleId" = r.id
WHERE ar."roleId" = ANY(%s)
""",
(roleIdsList,)
)
allRulesWithContext = cursor.fetchall()
# 4. Priorität zuweisen basierend auf Role-Scope
rulesWithPriority = []
for ruleRecord in allRulesWithContext:
ruleDict = dict(ruleRecord)
# Bestimme Priorität
if ruleDict.get("roleInstanceId"):
priority = 3 # Instance-Rolle = höchste Priorität
elif ruleDict.get("roleMandateId"):
priority = 2 # Mandate-Rolle
else:
priority = 1 # Global-Rolle = niedrigste Priorität
# Entferne Hilfsspalten vor AccessRule-Erstellung
ruleDict.pop("roleMandateId", None)
ruleDict.pop("roleInstanceId", None)
try:
rule = AccessRule(**ruleDict)
rulesWithPriority.append((priority, rule))
except Exception as e:
logger.error(f"Error converting rule record: {e}")
return rulesWithPriority
except Exception as e:
logger.error(f"Error in getRulesForUserBulk: {e}")
return []
def _getRulesForRoleIds(
self,
roleIds: List[str],
context: AccessRuleContext,
mandateId: Optional[str],
featureInstanceId: Optional[str]
) -> List[tuple]:
"""
Get all access rules for the given role IDs with priority.
Priority:
- 3: Instance-specific role (featureInstanceId set)
- 2: Mandate-specific role (mandateId set, no featureInstanceId)
- 1: Global role (no mandateId)
Args:
roleIds: List of role IDs
context: Access rule context
mandateId: Current mandate context
featureInstanceId: Current feature instance context
Returns:
List of (priority, AccessRule) tuples
"""
rulesWithPriority = []
if not roleIds:
return rulesWithPriority
try:
# Lade alle Regeln für alle Rollen
for roleId in roleIds:
rules = self.dbApp.getRecordset(
AccessRule,
recordFilter={"roleId": roleId, "context": context.value}
)
# Lade Role um Priorität zu bestimmen
roleRecords = self.dbApp.getRecordset(Role, recordFilter={"id": roleId})
if not roleRecords:
continue
# Convert to Pydantic model for type-safe access
roleDict = {k: v for k, v in roleRecords[0].items() if not k.startswith("_")}
role = Role(**roleDict)
# Bestimme Priorität basierend auf Role-Scope
if role.featureInstanceId:
priority = 3 # Instance-specific
elif role.mandateId:
priority = 2 # Mandate-specific
else:
priority = 1 # Global
for ruleRecord in rules:
try:
rule = AccessRule(**ruleRecord)
rulesWithPriority.append((priority, rule))
except Exception as e:
logger.error(f"Error converting rule record: {e}")
except Exception as e:
logger.error(f"Error loading rules for role IDs: {e}")
return rulesWithPriority
def _ruleMatchesItem(self, rule: AccessRule, item: str) -> bool:
"""
Check if a rule matches the given item.
Matching rules (in order of specificity):
1. Generic rule (item=None) matches everything
2. Exact match (rule.item == item)
3. Prefix match (item starts with rule.item + ".")
Example: rule "data.feature.trustee" matches item "data.feature.trustee.TrusteePosition"
All items MUST use the full objectKey format:
- UAM: data.uam.{TableName} (e.g., "data.uam.UserInDB")
- Chat: data.chat.{TableName} (e.g., "data.chat.ChatWorkflow")
- Files: data.files.{TableName} (e.g., "data.files.FileItem")
- Automation: data.automation.{TableName} (e.g., "data.automation.AutomationDefinition")
- Feature: data.feature.{featureCode}.{TableName} (e.g., "data.feature.trustee.TrusteePosition")
- UI: ui.{area}.{page} (e.g., "ui.admin.users")
Args:
rule: Access rule to check
item: Full objectKey to match against
Returns:
True if rule matches item
"""
if rule.item is None:
# Generic rule matches everything
return True
if not item:
# No item specified, only generic rules match
return rule.item is None
# Exact match
if rule.item == item:
return True
# Prefix match (e.g., "data.feature.trustee" matches "data.feature.trustee.TrusteePosition")
if item.startswith(rule.item + "."):
return True
return False
def _getItemSpecificity(self, rule: AccessRule, item: str) -> int:
"""
Calculate the specificity of a rule's item match.
Returns:
0 = generic rule (item=None)
1 = prefix match
2 = exact match
"""
if rule.item is None:
return 0 # Generic rule - lowest specificity
if rule.item == item:
return 2 # Exact match - highest specificity
if item and item.startswith(rule.item + "."):
return 1 # Prefix match - medium specificity
return 0 # No match (shouldn't happen if _ruleMatchesItem was true)
def findMostSpecificRule(self, rules: List[AccessRule], item: str) -> Optional[AccessRule]:
"""
Find the most specific rule for an item (longest matching prefix wins).
Args:
rules: List of access rules to search
item: Item identifier to match
Returns:
Most specific matching rule, or None if no match
"""
if not item:
# If no item specified, return generic rule (item = null)
genericRules = [r for r in rules if r.item is None]
return genericRules[0] if genericRules else None
# Find longest matching prefix
bestMatch = None
bestMatchLength = -1
for rule in rules:
if rule.item is None:
# Generic rule - use as fallback if no specific match found
if bestMatch is None:
bestMatch = rule
elif rule.item == item:
# Exact match - most specific
return rule
elif item.startswith(rule.item + "."):
# Prefix match - check if it's longer than current best
matchLength = len(rule.item.split("."))
if matchLength > bestMatchLength:
bestMatch = rule
bestMatchLength = matchLength
return bestMatch
def validateAccessRule(self, rule: AccessRule) -> bool:
"""
Validate that CUD permissions are allowed by read permission level (only for DATA context).
Args:
rule: AccessRule to validate
Returns:
True if rule is valid, False otherwise
"""
if rule.context != AccessRuleContext.DATA:
# For UI and RESOURCE contexts, only view is relevant
return True
if rule.read is None:
return False # DATA context requires read permission
readLevel = AccessLevel(rule.read)
# CUD operations are only allowed if read permission exists
for operation in [rule.create, rule.update, rule.delete]:
if operation is None or operation == AccessLevel.NONE.value:
continue # No access is always valid
if readLevel == AccessLevel.NONE:
return False # No CUD allowed if no read access
if readLevel == AccessLevel.MY and operation not in [AccessLevel.NONE.value, AccessLevel.MY.value]:
return False
if readLevel == AccessLevel.GROUP and operation not in [AccessLevel.NONE.value, AccessLevel.MY.value, AccessLevel.GROUP.value]:
return False
return True
def _isMorePermissive(self, level1: AccessLevel, level2: AccessLevel) -> bool:
"""
Check if level1 is more permissive than level2.
Args:
level1: First access level
level2: Second access level
Returns:
True if level1 is more permissive than level2
"""
hierarchy = {
AccessLevel.NONE: 0,
AccessLevel.MY: 1,
AccessLevel.GROUP: 2,
AccessLevel.ALL: 3
}
return hierarchy.get(level1, 0) > hierarchy.get(level2, 0)