- app.py: Pre-warm AI connectors at module load and in lifespan - aicoreModelRegistry.py: Connector discovery cache, getAvailableModels cache, bulk RBAC, eager prewarm - connectorDbPostgre.py: Connector cache, contextvars for userId, eviction (max 32) - chatbot: Uses _get_cached_connector, Service center integration, BillingService exceptions, BillingService exceptions instead of direct imports - interfaceDbApp.py: Uses _get_cached_connector - interfaceDbManagement.py: Uses _get_cached_connector - security/rbac.py: Adds checkResourceAccessBulk
638 lines
26 KiB
Python
638 lines
26 KiB
Python
# Copyright (c) 2025 Patrick Motsch
|
|
# All rights reserved.
|
|
"""
|
|
RBAC interface: Core RBAC logic and permission resolution.
|
|
|
|
Multi-Tenant Design:
|
|
- AccessRules referenzieren roleId (FK), nicht roleLabel
|
|
- Rollen werden über UserMandate + UserMandateRole geladen
|
|
- Priorisierung: Instance > Mandate > Global
|
|
- Stateless Design: Kein Cache, direkt aus DB
|
|
"""
|
|
|
|
import logging
|
|
from typing import Dict, List, Optional, TYPE_CHECKING
|
|
from modules.datamodels.datamodelRbac import AccessRule, AccessRuleContext, Role
|
|
from modules.datamodels.datamodelUam import User, UserPermissions, AccessLevel
|
|
from modules.datamodels.datamodelMembership import (
|
|
UserMandate,
|
|
UserMandateRole,
|
|
FeatureAccess,
|
|
FeatureAccessRole
|
|
)
|
|
|
|
if TYPE_CHECKING:
|
|
from modules.connectors.connectorDbPostgre import DatabaseConnector
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class RbacClass:
|
|
"""
|
|
RBAC interface for permission resolution and rule validation.
|
|
|
|
Multi-Tenant Design:
|
|
- Lädt Rollen über UserMandate + UserMandateRole
|
|
- AccessRules werden über roleId gefunden
|
|
- isSysAdmin für System-Level Operationen (ohne Mandant)
|
|
"""
|
|
|
|
def __init__(self, db: "DatabaseConnector", dbApp: "DatabaseConnector"):
|
|
"""
|
|
Initialize RBAC interface with database connector.
|
|
|
|
Args:
|
|
db: Database connector for general operations (may be from any database)
|
|
dbApp: DbApp database connector for AccessRule queries.
|
|
AccessRule table is always in the DbApp database.
|
|
"""
|
|
self.db = db
|
|
self.dbApp = dbApp
|
|
|
|
def getUserPermissions(
|
|
self,
|
|
user: User,
|
|
context: AccessRuleContext,
|
|
item: str,
|
|
mandateId: Optional[str] = None,
|
|
featureInstanceId: Optional[str] = None
|
|
) -> UserPermissions:
|
|
"""
|
|
Get combined permissions for a user across all their roles.
|
|
|
|
Multi-Tenant Design:
|
|
- Lädt Rollen aus UserMandate + UserMandateRole wenn mandateId gegeben
|
|
- isSysAdmin gibt vollen Zugriff, unabhängig vom Kontext
|
|
|
|
Args:
|
|
user: User object
|
|
context: Access rule context (DATA, UI, RESOURCE)
|
|
item: Item identifier (table name, UI path, resource path)
|
|
mandateId: Optional mandate context for role lookup
|
|
featureInstanceId: Optional feature instance context
|
|
|
|
Returns:
|
|
UserPermissions object with combined permissions
|
|
"""
|
|
permissions = UserPermissions(
|
|
view=False,
|
|
read=AccessLevel.NONE,
|
|
create=AccessLevel.NONE,
|
|
update=AccessLevel.NONE,
|
|
delete=AccessLevel.NONE
|
|
)
|
|
|
|
# Category A: isSysAdmin FLAG bypass (safety net for system operations)
|
|
# NOTE: sysadmin ROLE users get full access via AccessRules (DATA: ALL)
|
|
# This flag bypass is kept as fallback for true system-level operations
|
|
if hasattr(user, 'isSysAdmin') and user.isSysAdmin:
|
|
# User-owned namespaces: SysAdmin gets MY access only (own data).
|
|
# Every user -- including SysAdmin -- only has CRUD for their own
|
|
# chat workflows and files. Automation is excluded because it's
|
|
# managed by admins and the system event user needs ALL access.
|
|
_USER_OWNED_PREFIXES = ("data.chat.", "data.files.")
|
|
if item and any(item.startswith(p) for p in _USER_OWNED_PREFIXES):
|
|
return UserPermissions(
|
|
view=True,
|
|
read=AccessLevel.MY,
|
|
create=AccessLevel.MY,
|
|
update=AccessLevel.MY,
|
|
delete=AccessLevel.MY
|
|
)
|
|
return UserPermissions(
|
|
view=True,
|
|
read=AccessLevel.ALL,
|
|
create=AccessLevel.ALL,
|
|
update=AccessLevel.ALL,
|
|
delete=AccessLevel.ALL
|
|
)
|
|
|
|
# Lade Role-IDs für den User via UserMandate + UserMandateRole
|
|
roleIds = self._getRoleIdsForUser(user, mandateId, featureInstanceId)
|
|
|
|
if not roleIds:
|
|
logger.debug(f"getUserPermissions: NO roles found for user={user.id}, mandateId={mandateId}, featureInstanceId={featureInstanceId}, item={item}")
|
|
return permissions
|
|
|
|
# Lade alle relevanten Regeln für alle Rollen
|
|
allRulesWithPriority = self._getRulesForRoleIds(roleIds, context, mandateId, featureInstanceId)
|
|
|
|
# Für jede Rolle die spezifischste Regel finden
|
|
# Priority order: 1) Role priority (instance > mandate > global), 2) Item specificity (exact > prefix > generic)
|
|
rolePermissions = {}
|
|
for priority, rule in allRulesWithPriority:
|
|
# Find most specific rule for this item
|
|
if self._ruleMatchesItem(rule, item):
|
|
roleId = rule.roleId
|
|
# Calculate item specificity: 0=generic (null), 1=prefix match, 2=exact match
|
|
itemSpecificity = self._getItemSpecificity(rule, item)
|
|
|
|
# Compare: first by role priority, then by item specificity
|
|
if roleId not in rolePermissions:
|
|
rolePermissions[roleId] = (priority, itemSpecificity, rule)
|
|
else:
|
|
existingPriority, existingSpecificity, _ = rolePermissions[roleId]
|
|
# Higher role priority wins, or same priority but higher specificity
|
|
if priority > existingPriority or (priority == existingPriority and itemSpecificity > existingSpecificity):
|
|
rolePermissions[roleId] = (priority, itemSpecificity, rule)
|
|
|
|
# Find highest priority among matching rules (role priority only, specificity already handled per role)
|
|
highestPriority = max((p for p, _, _ in rolePermissions.values()), default=0)
|
|
|
|
# Combine permissions ONLY from rules with highest priority
|
|
# This ensures instance-specific rules (Priority 3) override global rules (Priority 1)
|
|
for roleId, (priority, specificity, rule) in rolePermissions.items():
|
|
# Only use rules with highest priority
|
|
if priority < highestPriority:
|
|
continue
|
|
|
|
# View: union logic - if ANY role has view=true, then view=true
|
|
if rule.view:
|
|
permissions.view = True
|
|
|
|
if context == AccessRuleContext.DATA:
|
|
# For DATA context, use most permissive access level across roles at same priority
|
|
if rule.read and self._isMorePermissive(rule.read, permissions.read):
|
|
permissions.read = rule.read
|
|
if rule.create and self._isMorePermissive(rule.create, permissions.create):
|
|
permissions.create = rule.create
|
|
if rule.update and self._isMorePermissive(rule.update, permissions.update):
|
|
permissions.update = rule.update
|
|
if rule.delete and self._isMorePermissive(rule.delete, permissions.delete):
|
|
permissions.delete = rule.delete
|
|
|
|
return permissions
|
|
|
|
def checkResourceAccessBulk(
|
|
self,
|
|
user: User,
|
|
resourcePaths: List[str],
|
|
mandateId: Optional[str] = None,
|
|
featureInstanceId: Optional[str] = None
|
|
) -> Dict[str, bool]:
|
|
"""
|
|
Check view access for multiple RESOURCE paths in one pass.
|
|
Uses same logic as getUserPermissions but batches DB access.
|
|
Returns {path: has_view}.
|
|
"""
|
|
result = {p: False for p in resourcePaths}
|
|
if not resourcePaths:
|
|
return result
|
|
# SysAdmin bypass
|
|
if hasattr(user, "isSysAdmin") and user.isSysAdmin:
|
|
return {p: True for p in resourcePaths}
|
|
roleIds = self._getRoleIdsForUser(user, mandateId, featureInstanceId)
|
|
if not roleIds:
|
|
return result
|
|
rulesWithPriority = self._getRulesForRoleIds(
|
|
roleIds, AccessRuleContext.RESOURCE, mandateId, featureInstanceId
|
|
)
|
|
for path in resourcePaths:
|
|
rolePermissions = {}
|
|
for priority, rule in rulesWithPriority:
|
|
if not self._ruleMatchesItem(rule, path):
|
|
continue
|
|
roleId = rule.roleId
|
|
itemSpecificity = self._getItemSpecificity(rule, path)
|
|
if roleId not in rolePermissions:
|
|
rolePermissions[roleId] = (priority, itemSpecificity, rule)
|
|
else:
|
|
existingPriority, existingSpecificity, _ = rolePermissions[roleId]
|
|
if priority > existingPriority or (
|
|
priority == existingPriority and itemSpecificity > existingSpecificity
|
|
):
|
|
rolePermissions[roleId] = (priority, itemSpecificity, rule)
|
|
highestPriority = max((p for p, _, _ in rolePermissions.values()), default=0)
|
|
for _, (priority, _, rule) in rolePermissions.items():
|
|
if priority >= highestPriority and rule.view:
|
|
result[path] = True
|
|
break
|
|
return result
|
|
|
|
def _getRoleIdsForUser(
|
|
self,
|
|
user: User,
|
|
mandateId: Optional[str],
|
|
featureInstanceId: Optional[str]
|
|
) -> List[str]:
|
|
"""
|
|
Get all role IDs for a user in the given context.
|
|
Uses UserMandate + UserMandateRole for the multi-tenant model.
|
|
|
|
Each mandate has its own instances of system roles (admin, user, viewer)
|
|
which are copied from the global templates during mandate creation.
|
|
Therefore, only the requested mandate's roles are loaded - no need to
|
|
load root mandate roles separately.
|
|
|
|
Loads roles from:
|
|
1. The requested mandate (if provided) - includes mandate-instance system roles
|
|
2. Feature instance roles (if featureInstanceId provided)
|
|
|
|
Args:
|
|
user: User object
|
|
mandateId: Mandate context
|
|
featureInstanceId: Feature instance context
|
|
|
|
Returns:
|
|
List of role IDs
|
|
"""
|
|
roleIds = set() # Use set to avoid duplicates
|
|
|
|
try:
|
|
if mandateId:
|
|
# Specific mandate context: load roles from that mandate only
|
|
userMandateRecords = self.dbApp.getRecordset(
|
|
UserMandate,
|
|
recordFilter={"userId": user.id, "mandateId": mandateId, "enabled": True}
|
|
)
|
|
|
|
if userMandateRecords:
|
|
userMandateId = userMandateRecords[0]["id"]
|
|
|
|
# Lade UserMandateRoles (Mandate-level roles)
|
|
userMandateRoleRecords = self.dbApp.getRecordset(
|
|
UserMandateRole,
|
|
recordFilter={"userMandateId": userMandateId}
|
|
)
|
|
|
|
foundRoles = [r["roleId"] for r in userMandateRoleRecords if r.get("roleId")]
|
|
roleIds.update(foundRoles)
|
|
else:
|
|
# No mandate context: load roles from ALL user's mandates.
|
|
# Required for user-owned namespaces (files, chat, automation) that
|
|
# are accessed without mandate context (e.g., /api/files/ endpoints).
|
|
# Data isolation is still enforced by _createdBy WHERE clause.
|
|
allUserMandates = self.dbApp.getRecordset(
|
|
UserMandate,
|
|
recordFilter={"userId": user.id, "enabled": True}
|
|
)
|
|
|
|
for um in allUserMandates:
|
|
userMandateId = um["id"]
|
|
userMandateRoleRecords = self.dbApp.getRecordset(
|
|
UserMandateRole,
|
|
recordFilter={"userMandateId": userMandateId}
|
|
)
|
|
roleIds.update([r["roleId"] for r in userMandateRoleRecords if r.get("roleId")])
|
|
|
|
# Load FeatureAccess + FeatureAccessRole (Instance-level roles)
|
|
if featureInstanceId:
|
|
# Specific feature instance: load roles from that instance only
|
|
featureAccessRecords = self.dbApp.getRecordset(
|
|
FeatureAccess,
|
|
recordFilter={
|
|
"userId": user.id,
|
|
"featureInstanceId": featureInstanceId,
|
|
"enabled": True
|
|
}
|
|
)
|
|
|
|
if featureAccessRecords:
|
|
featureAccessId = featureAccessRecords[0]["id"]
|
|
|
|
featureAccessRoleRecords = self.dbApp.getRecordset(
|
|
FeatureAccessRole,
|
|
recordFilter={"featureAccessId": featureAccessId}
|
|
)
|
|
|
|
roleIds.update([r["roleId"] for r in featureAccessRoleRecords if r.get("roleId")])
|
|
elif not mandateId:
|
|
# No context at all: also load feature-instance roles from ALL user's accesses.
|
|
# Same rationale: user-owned data needs roles for permission resolution.
|
|
allFeatureAccess = self.dbApp.getRecordset(
|
|
FeatureAccess,
|
|
recordFilter={"userId": user.id, "enabled": True}
|
|
)
|
|
|
|
for fa in allFeatureAccess:
|
|
featureAccessId = fa["id"]
|
|
featureAccessRoleRecords = self.dbApp.getRecordset(
|
|
FeatureAccessRole,
|
|
recordFilter={"featureAccessId": featureAccessId}
|
|
)
|
|
roleIds.update([r["roleId"] for r in featureAccessRoleRecords if r.get("roleId")])
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error loading role IDs for user {user.id}: {e}")
|
|
|
|
return list(roleIds)
|
|
|
|
def getRulesForUserBulk(
|
|
self,
|
|
userId: str,
|
|
mandateId: str,
|
|
featureInstanceId: Optional[str] = None
|
|
) -> List[tuple]:
|
|
"""
|
|
Lädt alle relevanten Regeln für einen User in EINEM Query.
|
|
Stateless: Kein Cache, direkt aus DB.
|
|
|
|
Optimiert für Multi-Tenant mit Junction Tables:
|
|
- Mandant-Rollen via UserMandate → UserMandateRole
|
|
- Instanz-Rollen via FeatureAccess → FeatureAccessRole
|
|
|
|
Args:
|
|
userId: User ID
|
|
mandateId: Mandate context
|
|
featureInstanceId: Optional feature instance context
|
|
|
|
Returns:
|
|
Liste von (priority, AccessRule) Tupeln
|
|
"""
|
|
if not mandateId:
|
|
return []
|
|
|
|
try:
|
|
conn = self.dbApp.connection
|
|
roleIds = set()
|
|
|
|
# 1. Mandant-Rollen via UserMandate → UserMandateRole (SINGLE Query)
|
|
with conn.cursor() as cursor:
|
|
cursor.execute(
|
|
"""
|
|
SELECT umr."roleId"
|
|
FROM "UserMandate" um
|
|
JOIN "UserMandateRole" umr ON umr."userMandateId" = um.id
|
|
WHERE um."userId" = %s AND um."mandateId" = %s AND um."enabled" = true
|
|
""",
|
|
(userId, mandateId)
|
|
)
|
|
mandateRoles = cursor.fetchall()
|
|
roleIds.update(r["roleId"] for r in mandateRoles if r.get("roleId"))
|
|
|
|
# 2. Instanz-Rollen via FeatureAccess → FeatureAccessRole (SINGLE Query)
|
|
if featureInstanceId:
|
|
with conn.cursor() as cursor:
|
|
cursor.execute(
|
|
"""
|
|
SELECT far."roleId"
|
|
FROM "FeatureAccess" fa
|
|
JOIN "FeatureAccessRole" far ON far."featureAccessId" = fa.id
|
|
WHERE fa."userId" = %s AND fa."featureInstanceId" = %s AND fa."enabled" = true
|
|
""",
|
|
(userId, featureInstanceId)
|
|
)
|
|
instanceRoles = cursor.fetchall()
|
|
roleIds.update(r["roleId"] for r in instanceRoles if r.get("roleId"))
|
|
|
|
if not roleIds:
|
|
return []
|
|
|
|
# 3. BULK Query: Alle Regeln für alle Rollen + zugehörige Role-Daten
|
|
# SINGLE Query mit JOIN statt N+1
|
|
roleIdsList = list(roleIds)
|
|
with conn.cursor() as cursor:
|
|
cursor.execute(
|
|
"""
|
|
SELECT ar.*, r."mandateId" as "roleMandateId",
|
|
r."featureInstanceId" as "roleInstanceId"
|
|
FROM "AccessRule" ar
|
|
JOIN "Role" r ON ar."roleId" = r.id
|
|
WHERE ar."roleId" = ANY(%s)
|
|
""",
|
|
(roleIdsList,)
|
|
)
|
|
allRulesWithContext = cursor.fetchall()
|
|
|
|
# 4. Priorität zuweisen basierend auf Role-Scope
|
|
rulesWithPriority = []
|
|
for ruleRecord in allRulesWithContext:
|
|
ruleDict = dict(ruleRecord)
|
|
|
|
# Bestimme Priorität
|
|
if ruleDict.get("roleInstanceId"):
|
|
priority = 3 # Instance-Rolle = höchste Priorität
|
|
elif ruleDict.get("roleMandateId"):
|
|
priority = 2 # Mandate-Rolle
|
|
else:
|
|
priority = 1 # Global-Rolle = niedrigste Priorität
|
|
|
|
# Entferne Hilfsspalten vor AccessRule-Erstellung
|
|
ruleDict.pop("roleMandateId", None)
|
|
ruleDict.pop("roleInstanceId", None)
|
|
|
|
try:
|
|
rule = AccessRule(**ruleDict)
|
|
rulesWithPriority.append((priority, rule))
|
|
except Exception as e:
|
|
logger.error(f"Error converting rule record: {e}")
|
|
|
|
return rulesWithPriority
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in getRulesForUserBulk: {e}")
|
|
return []
|
|
|
|
def _getRulesForRoleIds(
|
|
self,
|
|
roleIds: List[str],
|
|
context: AccessRuleContext,
|
|
mandateId: Optional[str],
|
|
featureInstanceId: Optional[str]
|
|
) -> List[tuple]:
|
|
"""
|
|
Get all access rules for the given role IDs with priority.
|
|
|
|
Priority:
|
|
- 3: Instance-specific role (featureInstanceId set)
|
|
- 2: Mandate-specific role (mandateId set, no featureInstanceId)
|
|
- 1: Global role (no mandateId)
|
|
|
|
Args:
|
|
roleIds: List of role IDs
|
|
context: Access rule context
|
|
mandateId: Current mandate context
|
|
featureInstanceId: Current feature instance context
|
|
|
|
Returns:
|
|
List of (priority, AccessRule) tuples
|
|
"""
|
|
rulesWithPriority = []
|
|
|
|
if not roleIds:
|
|
return rulesWithPriority
|
|
|
|
try:
|
|
# Lade alle Regeln für alle Rollen
|
|
for roleId in roleIds:
|
|
rules = self.dbApp.getRecordset(
|
|
AccessRule,
|
|
recordFilter={"roleId": roleId, "context": context.value}
|
|
)
|
|
|
|
# Lade Role um Priorität zu bestimmen
|
|
roleRecords = self.dbApp.getRecordset(Role, recordFilter={"id": roleId})
|
|
if not roleRecords:
|
|
continue
|
|
|
|
# Convert to Pydantic model for type-safe access
|
|
roleDict = {k: v for k, v in roleRecords[0].items() if not k.startswith("_")}
|
|
role = Role(**roleDict)
|
|
|
|
# Bestimme Priorität basierend auf Role-Scope
|
|
if role.featureInstanceId:
|
|
priority = 3 # Instance-specific
|
|
elif role.mandateId:
|
|
priority = 2 # Mandate-specific
|
|
else:
|
|
priority = 1 # Global
|
|
|
|
for ruleRecord in rules:
|
|
try:
|
|
rule = AccessRule(**ruleRecord)
|
|
rulesWithPriority.append((priority, rule))
|
|
except Exception as e:
|
|
logger.error(f"Error converting rule record: {e}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error loading rules for role IDs: {e}")
|
|
|
|
return rulesWithPriority
|
|
|
|
def _ruleMatchesItem(self, rule: AccessRule, item: str) -> bool:
|
|
"""
|
|
Check if a rule matches the given item.
|
|
|
|
Matching rules (in order of specificity):
|
|
1. Generic rule (item=None) matches everything
|
|
2. Exact match (rule.item == item)
|
|
3. Prefix match (item starts with rule.item + ".")
|
|
Example: rule "data.feature.trustee" matches item "data.feature.trustee.TrusteePosition"
|
|
|
|
All items MUST use the full objectKey format:
|
|
- UAM: data.uam.{TableName} (e.g., "data.uam.UserInDB")
|
|
- Chat: data.chat.{TableName} (e.g., "data.chat.ChatWorkflow")
|
|
- Files: data.files.{TableName} (e.g., "data.files.FileItem")
|
|
- Automation: data.automation.{TableName} (e.g., "data.automation.AutomationDefinition")
|
|
- Feature: data.feature.{featureCode}.{TableName} (e.g., "data.feature.trustee.TrusteePosition")
|
|
- UI: ui.{area}.{page} (e.g., "ui.admin.users")
|
|
|
|
Args:
|
|
rule: Access rule to check
|
|
item: Full objectKey to match against
|
|
|
|
Returns:
|
|
True if rule matches item
|
|
"""
|
|
if rule.item is None:
|
|
# Generic rule matches everything
|
|
return True
|
|
|
|
if not item:
|
|
# No item specified, only generic rules match
|
|
return rule.item is None
|
|
|
|
# Exact match
|
|
if rule.item == item:
|
|
return True
|
|
|
|
# Prefix match (e.g., "data.feature.trustee" matches "data.feature.trustee.TrusteePosition")
|
|
if item.startswith(rule.item + "."):
|
|
return True
|
|
|
|
return False
|
|
|
|
def _getItemSpecificity(self, rule: AccessRule, item: str) -> int:
|
|
"""
|
|
Calculate the specificity of a rule's item match.
|
|
|
|
Returns:
|
|
0 = generic rule (item=None)
|
|
1 = prefix match
|
|
2 = exact match
|
|
"""
|
|
if rule.item is None:
|
|
return 0 # Generic rule - lowest specificity
|
|
if rule.item == item:
|
|
return 2 # Exact match - highest specificity
|
|
if item and item.startswith(rule.item + "."):
|
|
return 1 # Prefix match - medium specificity
|
|
return 0 # No match (shouldn't happen if _ruleMatchesItem was true)
|
|
|
|
def findMostSpecificRule(self, rules: List[AccessRule], item: str) -> Optional[AccessRule]:
|
|
"""
|
|
Find the most specific rule for an item (longest matching prefix wins).
|
|
|
|
Args:
|
|
rules: List of access rules to search
|
|
item: Item identifier to match
|
|
|
|
Returns:
|
|
Most specific matching rule, or None if no match
|
|
"""
|
|
if not item:
|
|
# If no item specified, return generic rule (item = null)
|
|
genericRules = [r for r in rules if r.item is None]
|
|
return genericRules[0] if genericRules else None
|
|
|
|
# Find longest matching prefix
|
|
bestMatch = None
|
|
bestMatchLength = -1
|
|
|
|
for rule in rules:
|
|
if rule.item is None:
|
|
# Generic rule - use as fallback if no specific match found
|
|
if bestMatch is None:
|
|
bestMatch = rule
|
|
elif rule.item == item:
|
|
# Exact match - most specific
|
|
return rule
|
|
elif item.startswith(rule.item + "."):
|
|
# Prefix match - check if it's longer than current best
|
|
matchLength = len(rule.item.split("."))
|
|
if matchLength > bestMatchLength:
|
|
bestMatch = rule
|
|
bestMatchLength = matchLength
|
|
|
|
return bestMatch
|
|
|
|
def validateAccessRule(self, rule: AccessRule) -> bool:
|
|
"""
|
|
Validate that CUD permissions are allowed by read permission level (only for DATA context).
|
|
|
|
Args:
|
|
rule: AccessRule to validate
|
|
|
|
Returns:
|
|
True if rule is valid, False otherwise
|
|
"""
|
|
if rule.context != AccessRuleContext.DATA:
|
|
# For UI and RESOURCE contexts, only view is relevant
|
|
return True
|
|
|
|
if rule.read is None:
|
|
return False # DATA context requires read permission
|
|
|
|
readLevel = AccessLevel(rule.read)
|
|
|
|
# CUD operations are only allowed if read permission exists
|
|
for operation in [rule.create, rule.update, rule.delete]:
|
|
if operation is None or operation == AccessLevel.NONE.value:
|
|
continue # No access is always valid
|
|
if readLevel == AccessLevel.NONE:
|
|
return False # No CUD allowed if no read access
|
|
if readLevel == AccessLevel.MY and operation not in [AccessLevel.NONE.value, AccessLevel.MY.value]:
|
|
return False
|
|
if readLevel == AccessLevel.GROUP and operation not in [AccessLevel.NONE.value, AccessLevel.MY.value, AccessLevel.GROUP.value]:
|
|
return False
|
|
|
|
return True
|
|
|
|
def _isMorePermissive(self, level1: AccessLevel, level2: AccessLevel) -> bool:
|
|
"""
|
|
Check if level1 is more permissive than level2.
|
|
|
|
Args:
|
|
level1: First access level
|
|
level2: Second access level
|
|
|
|
Returns:
|
|
True if level1 is more permissive than level2
|
|
"""
|
|
hierarchy = {
|
|
AccessLevel.NONE: 0,
|
|
AccessLevel.MY: 1,
|
|
AccessLevel.GROUP: 2,
|
|
AccessLevel.ALL: 3
|
|
}
|
|
return hierarchy.get(level1, 0) > hierarchy.get(level2, 0)
|