# Copyright (c) 2025 Patrick Motsch # All rights reserved. """ RBAC interface: Core RBAC logic and permission resolution. Multi-Tenant Design: - AccessRules referenzieren roleId (FK), nicht roleLabel - Rollen werden über UserMandate + UserMandateRole geladen - Priorisierung: Instance > Mandate > Global - Stateless Design: Kein Cache, direkt aus DB """ import logging from typing import Dict, List, Optional, TYPE_CHECKING from modules.datamodels.datamodelRbac import AccessRule, AccessRuleContext, Role from modules.datamodels.datamodelUam import User, UserPermissions, AccessLevel from modules.datamodels.datamodelMembership import ( UserMandate, UserMandateRole, FeatureAccess, FeatureAccessRole ) if TYPE_CHECKING: from modules.connectors.connectorDbPostgre import DatabaseConnector logger = logging.getLogger(__name__) class RbacClass: """ RBAC interface for permission resolution and rule validation. Multi-Tenant Design: - Lädt Rollen über UserMandate + UserMandateRole - AccessRules werden über roleId gefunden - isSysAdmin für System-Level Operationen (ohne Mandant) """ def __init__(self, db: "DatabaseConnector", dbApp: "DatabaseConnector"): """ Initialize RBAC interface with database connector. Args: db: Database connector for general operations (may be from any database) dbApp: DbApp database connector for AccessRule queries. AccessRule table is always in the DbApp database. """ self.db = db self.dbApp = dbApp def getUserPermissions( self, user: User, context: AccessRuleContext, item: str, mandateId: Optional[str] = None, featureInstanceId: Optional[str] = None ) -> UserPermissions: """ Get combined permissions for a user across all their roles. Multi-Tenant Design: - Lädt Rollen aus UserMandate + UserMandateRole wenn mandateId gegeben - isSysAdmin gibt vollen Zugriff, unabhängig vom Kontext Args: user: User object context: Access rule context (DATA, UI, RESOURCE) item: Item identifier (table name, UI path, resource path) mandateId: Optional mandate context for role lookup featureInstanceId: Optional feature instance context Returns: UserPermissions object with combined permissions """ permissions = UserPermissions( view=False, read=AccessLevel.NONE, create=AccessLevel.NONE, update=AccessLevel.NONE, delete=AccessLevel.NONE ) # Category A: isSysAdmin FLAG bypass (safety net for system operations) # NOTE: sysadmin ROLE users get full access via AccessRules (DATA: ALL) # This flag bypass is kept as fallback for true system-level operations if hasattr(user, 'isSysAdmin') and user.isSysAdmin: # User-owned namespaces: SysAdmin gets MY access only (own data). # Every user -- including SysAdmin -- only has CRUD for their own # chat workflows and files. Automation is excluded because it's # managed by admins and the system event user needs ALL access. _USER_OWNED_PREFIXES = ("data.chat.", "data.files.") if item and any(item.startswith(p) for p in _USER_OWNED_PREFIXES): return UserPermissions( view=True, read=AccessLevel.MY, create=AccessLevel.MY, update=AccessLevel.MY, delete=AccessLevel.MY ) return UserPermissions( view=True, read=AccessLevel.ALL, create=AccessLevel.ALL, update=AccessLevel.ALL, delete=AccessLevel.ALL ) # Lade Role-IDs für den User via UserMandate + UserMandateRole roleIds = self._getRoleIdsForUser(user, mandateId, featureInstanceId) if not roleIds: logger.debug(f"getUserPermissions: NO roles found for user={user.id}, mandateId={mandateId}, featureInstanceId={featureInstanceId}, item={item}") return permissions # Lade alle relevanten Regeln für alle Rollen allRulesWithPriority = self._getRulesForRoleIds(roleIds, context, mandateId, featureInstanceId) # Für jede Rolle die spezifischste Regel finden # Priority order: 1) Role priority (instance > mandate > global), 2) Item specificity (exact > prefix > generic) rolePermissions = {} for priority, rule in allRulesWithPriority: # Find most specific rule for this item if self._ruleMatchesItem(rule, item): roleId = rule.roleId # Calculate item specificity: 0=generic (null), 1=prefix match, 2=exact match itemSpecificity = self._getItemSpecificity(rule, item) # Compare: first by role priority, then by item specificity if roleId not in rolePermissions: rolePermissions[roleId] = (priority, itemSpecificity, rule) else: existingPriority, existingSpecificity, _ = rolePermissions[roleId] # Higher role priority wins, or same priority but higher specificity if priority > existingPriority or (priority == existingPriority and itemSpecificity > existingSpecificity): rolePermissions[roleId] = (priority, itemSpecificity, rule) # Find highest priority among matching rules (role priority only, specificity already handled per role) highestPriority = max((p for p, _, _ in rolePermissions.values()), default=0) # Combine permissions ONLY from rules with highest priority # This ensures instance-specific rules (Priority 3) override global rules (Priority 1) for roleId, (priority, specificity, rule) in rolePermissions.items(): # Only use rules with highest priority if priority < highestPriority: continue # View: union logic - if ANY role has view=true, then view=true if rule.view: permissions.view = True if context == AccessRuleContext.DATA: # For DATA context, use most permissive access level across roles at same priority if rule.read and self._isMorePermissive(rule.read, permissions.read): permissions.read = rule.read if rule.create and self._isMorePermissive(rule.create, permissions.create): permissions.create = rule.create if rule.update and self._isMorePermissive(rule.update, permissions.update): permissions.update = rule.update if rule.delete and self._isMorePermissive(rule.delete, permissions.delete): permissions.delete = rule.delete return permissions def checkResourceAccessBulk( self, user: User, resourcePaths: List[str], mandateId: Optional[str] = None, featureInstanceId: Optional[str] = None ) -> Dict[str, bool]: """ Check view access for multiple RESOURCE paths in one pass. Uses same logic as getUserPermissions but batches DB access. Returns {path: has_view}. """ result = {p: False for p in resourcePaths} if not resourcePaths: return result # SysAdmin bypass if hasattr(user, "isSysAdmin") and user.isSysAdmin: return {p: True for p in resourcePaths} roleIds = self._getRoleIdsForUser(user, mandateId, featureInstanceId) if not roleIds: return result rulesWithPriority = self._getRulesForRoleIds( roleIds, AccessRuleContext.RESOURCE, mandateId, featureInstanceId ) for path in resourcePaths: rolePermissions = {} for priority, rule in rulesWithPriority: if not self._ruleMatchesItem(rule, path): continue roleId = rule.roleId itemSpecificity = self._getItemSpecificity(rule, path) if roleId not in rolePermissions: rolePermissions[roleId] = (priority, itemSpecificity, rule) else: existingPriority, existingSpecificity, _ = rolePermissions[roleId] if priority > existingPriority or ( priority == existingPriority and itemSpecificity > existingSpecificity ): rolePermissions[roleId] = (priority, itemSpecificity, rule) highestPriority = max((p for p, _, _ in rolePermissions.values()), default=0) for _, (priority, _, rule) in rolePermissions.items(): if priority >= highestPriority and rule.view: result[path] = True break return result def _getRoleIdsForUser( self, user: User, mandateId: Optional[str], featureInstanceId: Optional[str] ) -> List[str]: """ Get all role IDs for a user in the given context. Uses UserMandate + UserMandateRole for the multi-tenant model. Each mandate has its own instances of system roles (admin, user, viewer) which are copied from the global templates during mandate creation. Therefore, only the requested mandate's roles are loaded - no need to load root mandate roles separately. Loads roles from: 1. The requested mandate (if provided) - includes mandate-instance system roles 2. Feature instance roles (if featureInstanceId provided) Args: user: User object mandateId: Mandate context featureInstanceId: Feature instance context Returns: List of role IDs """ roleIds = set() # Use set to avoid duplicates try: if mandateId: # Specific mandate context: load roles from that mandate only userMandateRecords = self.dbApp.getRecordset( UserMandate, recordFilter={"userId": user.id, "mandateId": mandateId, "enabled": True} ) if userMandateRecords: userMandateId = userMandateRecords[0]["id"] # Lade UserMandateRoles (Mandate-level roles) userMandateRoleRecords = self.dbApp.getRecordset( UserMandateRole, recordFilter={"userMandateId": userMandateId} ) foundRoles = [r["roleId"] for r in userMandateRoleRecords if r.get("roleId")] roleIds.update(foundRoles) else: # No mandate context: load roles from ALL user's mandates. # Required for user-owned namespaces (files, chat, automation) that # are accessed without mandate context (e.g., /api/files/ endpoints). # Data isolation is still enforced by sysCreatedBy WHERE clause. allUserMandates = self.dbApp.getRecordset( UserMandate, recordFilter={"userId": user.id, "enabled": True} ) for um in allUserMandates: userMandateId = um["id"] userMandateRoleRecords = self.dbApp.getRecordset( UserMandateRole, recordFilter={"userMandateId": userMandateId} ) roleIds.update([r["roleId"] for r in userMandateRoleRecords if r.get("roleId")]) # Load FeatureAccess + FeatureAccessRole (Instance-level roles) if featureInstanceId: # Specific feature instance: load roles from that instance only featureAccessRecords = self.dbApp.getRecordset( FeatureAccess, recordFilter={ "userId": user.id, "featureInstanceId": featureInstanceId, "enabled": True } ) if featureAccessRecords: featureAccessId = featureAccessRecords[0]["id"] featureAccessRoleRecords = self.dbApp.getRecordset( FeatureAccessRole, recordFilter={"featureAccessId": featureAccessId} ) roleIds.update([r["roleId"] for r in featureAccessRoleRecords if r.get("roleId")]) elif not mandateId: # No context at all: also load feature-instance roles from ALL user's accesses. # Same rationale: user-owned data needs roles for permission resolution. allFeatureAccess = self.dbApp.getRecordset( FeatureAccess, recordFilter={"userId": user.id, "enabled": True} ) for fa in allFeatureAccess: featureAccessId = fa["id"] featureAccessRoleRecords = self.dbApp.getRecordset( FeatureAccessRole, recordFilter={"featureAccessId": featureAccessId} ) roleIds.update([r["roleId"] for r in featureAccessRoleRecords if r.get("roleId")]) except Exception as e: logger.error(f"Error loading role IDs for user {user.id}: {e}") return list(roleIds) def getRulesForUserBulk( self, userId: str, mandateId: str, featureInstanceId: Optional[str] = None ) -> List[tuple]: """ Lädt alle relevanten Regeln für einen User in EINEM Query. Stateless: Kein Cache, direkt aus DB. Optimiert für Multi-Tenant mit Junction Tables: - Mandant-Rollen via UserMandate → UserMandateRole - Instanz-Rollen via FeatureAccess → FeatureAccessRole Args: userId: User ID mandateId: Mandate context featureInstanceId: Optional feature instance context Returns: Liste von (priority, AccessRule) Tupeln """ if not mandateId: return [] try: conn = self.dbApp.connection roleIds = set() # 1. Mandant-Rollen via UserMandate → UserMandateRole (SINGLE Query) with conn.cursor() as cursor: cursor.execute( """ SELECT umr."roleId" FROM "UserMandate" um JOIN "UserMandateRole" umr ON umr."userMandateId" = um.id WHERE um."userId" = %s AND um."mandateId" = %s AND um."enabled" = true """, (userId, mandateId) ) mandateRoles = cursor.fetchall() roleIds.update(r["roleId"] for r in mandateRoles if r.get("roleId")) # 2. Instanz-Rollen via FeatureAccess → FeatureAccessRole (SINGLE Query) if featureInstanceId: with conn.cursor() as cursor: cursor.execute( """ SELECT far."roleId" FROM "FeatureAccess" fa JOIN "FeatureAccessRole" far ON far."featureAccessId" = fa.id WHERE fa."userId" = %s AND fa."featureInstanceId" = %s AND fa."enabled" = true """, (userId, featureInstanceId) ) instanceRoles = cursor.fetchall() roleIds.update(r["roleId"] for r in instanceRoles if r.get("roleId")) if not roleIds: return [] # 3. BULK Query: Alle Regeln für alle Rollen + zugehörige Role-Daten # SINGLE Query mit JOIN statt N+1 roleIdsList = list(roleIds) with conn.cursor() as cursor: cursor.execute( """ SELECT ar.*, r."mandateId" as "roleMandateId", r."featureInstanceId" as "roleInstanceId" FROM "AccessRule" ar JOIN "Role" r ON ar."roleId" = r.id WHERE ar."roleId" = ANY(%s) """, (roleIdsList,) ) allRulesWithContext = cursor.fetchall() # 4. Priorität zuweisen basierend auf Role-Scope rulesWithPriority = [] for ruleRecord in allRulesWithContext: ruleDict = dict(ruleRecord) # Bestimme Priorität if ruleDict.get("roleInstanceId"): priority = 3 # Instance-Rolle = höchste Priorität elif ruleDict.get("roleMandateId"): priority = 2 # Mandate-Rolle else: priority = 1 # Global-Rolle = niedrigste Priorität # Entferne Hilfsspalten vor AccessRule-Erstellung ruleDict.pop("roleMandateId", None) ruleDict.pop("roleInstanceId", None) try: rule = AccessRule(**ruleDict) rulesWithPriority.append((priority, rule)) except Exception as e: logger.error(f"Error converting rule record: {e}") return rulesWithPriority except Exception as e: logger.error(f"Error in getRulesForUserBulk: {e}") return [] def _getRulesForRoleIds( self, roleIds: List[str], context: AccessRuleContext, mandateId: Optional[str], featureInstanceId: Optional[str] ) -> List[tuple]: """ Get all access rules for the given role IDs with priority. Priority: - 3: Instance-specific role (featureInstanceId set) - 2: Mandate-specific role (mandateId set, no featureInstanceId) - 1: Global role (no mandateId) Args: roleIds: List of role IDs context: Access rule context mandateId: Current mandate context featureInstanceId: Current feature instance context Returns: List of (priority, AccessRule) tuples """ rulesWithPriority = [] if not roleIds: return rulesWithPriority try: # Lade alle Regeln für alle Rollen for roleId in roleIds: rules = self.dbApp.getRecordset( AccessRule, recordFilter={"roleId": roleId, "context": context.value} ) # Lade Role um Priorität zu bestimmen roleRecords = self.dbApp.getRecordset(Role, recordFilter={"id": roleId}) if not roleRecords: continue # Convert to Pydantic model for type-safe access roleDict = {k: v for k, v in roleRecords[0].items() if not k.startswith("_")} role = Role(**roleDict) # Bestimme Priorität basierend auf Role-Scope if role.featureInstanceId: priority = 3 # Instance-specific elif role.mandateId: priority = 2 # Mandate-specific else: priority = 1 # Global for ruleRecord in rules: try: rule = AccessRule(**ruleRecord) rulesWithPriority.append((priority, rule)) except Exception as e: logger.error(f"Error converting rule record: {e}") except Exception as e: logger.error(f"Error loading rules for role IDs: {e}") return rulesWithPriority def _ruleMatchesItem(self, rule: AccessRule, item: str) -> bool: """ Check if a rule matches the given item. Matching rules (in order of specificity): 1. Generic rule (item=None) matches everything 2. Exact match (rule.item == item) 3. Prefix match (item starts with rule.item + ".") Example: rule "data.feature.trustee" matches item "data.feature.trustee.TrusteePosition" All items MUST use the full objectKey format: - UAM: data.uam.{TableName} (e.g., "data.uam.UserInDB") - Chat: data.chat.{TableName} (e.g., "data.chat.ChatWorkflow") - Files: data.files.{TableName} (e.g., "data.files.FileItem") - Automation: data.automation.{TableName} (e.g., "data.automation.AutomationDefinition") - Feature: data.feature.{featureCode}.{TableName} (e.g., "data.feature.trustee.TrusteePosition") - UI: ui.{area}.{page} (e.g., "ui.admin.users") Args: rule: Access rule to check item: Full objectKey to match against Returns: True if rule matches item """ if rule.item is None: # Generic rule matches everything return True if not item: # No item specified, only generic rules match return rule.item is None # Exact match if rule.item == item: return True # Prefix match (e.g., "data.feature.trustee" matches "data.feature.trustee.TrusteePosition") if item.startswith(rule.item + "."): return True return False def _getItemSpecificity(self, rule: AccessRule, item: str) -> int: """ Calculate the specificity of a rule's item match. Returns: 0 = generic rule (item=None) 1 = prefix match 2 = exact match """ if rule.item is None: return 0 # Generic rule - lowest specificity if rule.item == item: return 2 # Exact match - highest specificity if item and item.startswith(rule.item + "."): return 1 # Prefix match - medium specificity return 0 # No match (shouldn't happen if _ruleMatchesItem was true) def findMostSpecificRule(self, rules: List[AccessRule], item: str) -> Optional[AccessRule]: """ Find the most specific rule for an item (longest matching prefix wins). Args: rules: List of access rules to search item: Item identifier to match Returns: Most specific matching rule, or None if no match """ if not item: # If no item specified, return generic rule (item = null) genericRules = [r for r in rules if r.item is None] return genericRules[0] if genericRules else None # Find longest matching prefix bestMatch = None bestMatchLength = -1 for rule in rules: if rule.item is None: # Generic rule - use as fallback if no specific match found if bestMatch is None: bestMatch = rule elif rule.item == item: # Exact match - most specific return rule elif item.startswith(rule.item + "."): # Prefix match - check if it's longer than current best matchLength = len(rule.item.split(".")) if matchLength > bestMatchLength: bestMatch = rule bestMatchLength = matchLength return bestMatch def validateAccessRule(self, rule: AccessRule) -> bool: """ Validate that CUD permissions are allowed by read permission level (only for DATA context). Args: rule: AccessRule to validate Returns: True if rule is valid, False otherwise """ if rule.context != AccessRuleContext.DATA: # For UI and RESOURCE contexts, only view is relevant return True if rule.read is None: return False # DATA context requires read permission readLevel = AccessLevel(rule.read) # CUD operations are only allowed if read permission exists for operation in [rule.create, rule.update, rule.delete]: if operation is None or operation == AccessLevel.NONE.value: continue # No access is always valid if readLevel == AccessLevel.NONE: return False # No CUD allowed if no read access if readLevel == AccessLevel.MY and operation not in [AccessLevel.NONE.value, AccessLevel.MY.value]: return False if readLevel == AccessLevel.GROUP and operation not in [AccessLevel.NONE.value, AccessLevel.MY.value, AccessLevel.GROUP.value]: return False return True def _isMorePermissive(self, level1: AccessLevel, level2: AccessLevel) -> bool: """ Check if level1 is more permissive than level2. Args: level1: First access level level2: Second access level Returns: True if level1 is more permissive than level2 """ hierarchy = { AccessLevel.NONE: 0, AccessLevel.MY: 1, AccessLevel.GROUP: 2, AccessLevel.ALL: 3 } return hierarchy.get(level1, 0) > hierarchy.get(level2, 0)