gateway/modules/interfaces/interfaceComponentAccess.py
2025-09-13 01:57:53 +02:00

202 lines
No EOL
8.8 KiB
Python

"""
Access control module for Management interface.
Handles user access management and permission checks.
"""
import logging
from typing import Dict, Any, List, Optional
from modules.interfaces.interfaceAppModel import User, UserInDB
from modules.interfaces.interfaceComponentModel import Prompt, FileItem, FileData, VoiceSettings
from modules.interfaces.interfaceChatModel import ChatWorkflow, ChatMessage, ChatLog
# Configure logger
logger = logging.getLogger(__name__)
class ComponentAccess:
"""
Access control class for Management interface.
Handles user access management and permission checks.
"""
def __init__(self, currentUser: User, db):
"""Initialize with user context."""
self.currentUser = currentUser
self.userId = currentUser.id
self.mandateId = currentUser.mandateId
self.privilege = currentUser.privilege
self.db = db
def getInitialUserid(self):
return "----"
# return self.db.getInitialUserId() --> to get from AdminDB !
def canModifyAttribute(self, table: str, attribute: str) -> bool:
"""
Checks if the current user can modify a specific attribute in a table.
Args:
table: Name of the table
attribute: Name of the attribute
Returns:
Boolean indicating permission
"""
userPrivilege = self.privilege
# Special case for mandateId in prompts table
if table == "prompts" and attribute == "mandateId":
return userPrivilege == "sysadmin"
return True
def uam(self, model_class: type, recordset: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Unified user access management function that filters data based on user privileges
and adds access control attributes.
Args:
model_class: Pydantic model class for the table
recordset: Recordset to filter based on access rules
Returns:
Filtered recordset with access control attributes
"""
userPrivilege = self.privilege
table_name = model_class.__name__
filtered_records = []
initialid = self.getInitialUserid()
# Apply filtering based on privilege
if userPrivilege == "sysadmin":
filtered_records = recordset # System admins see all records
elif userPrivilege == "admin":
# Admins see records in their mandate
filtered_records = [r for r in recordset if r.get("mandateId") == self.mandateId]
else: # Regular users
# For prompts, users can see all prompts from their mandate
if table_name == "Prompt":
filtered_records = [r for r in recordset if r.get("mandateId") == self.mandateId]
elif table_name == "UserInDB":
# For users table, users can only see their own record
filtered_records = [r for r in recordset if r.get("id") == self.userId]
elif table_name == "VoiceSettings":
# For voice settings, users can only see their own settings
filtered_records = [r for r in recordset if r.get("userId") == self.userId]
else:
# Users see only their records for other tables
filtered_records = [
r for r in recordset
if r.get("mandateId") == self.mandateId and r.get("_createdBy") == self.userId
]
# Add access control attributes to each record
for record in filtered_records:
record_id = record.get("id")
# Set access control flags based on user permissions
if table_name == "Prompt":
record["_hideView"] = False # Everyone can view
record["_hideEdit"] = not self.canModify(Prompt, record_id)
record["_hideDelete"] = not self.canModify(Prompt, record_id)
# Add attribute-level permissions for mandateId
if "mandateId" in record:
record["_hideEdit_mandateId"] = not self.canModifyAttribute(Prompt, "mandateId")
elif table_name == "FileItem":
record["_hideView"] = False # Everyone can view
record["_hideEdit"] = not self.canModify(FileItem, record_id)
record["_hideDelete"] = not self.canModify(FileItem, record_id)
record["_hideDownload"] = not self.canModify(FileItem, record_id)
elif table_name == "ChatWorkflow":
record["_hideView"] = False # Everyone can view
record["_hideEdit"] = not self.canModify(ChatWorkflow, record_id)
record["_hideDelete"] = not self.canModify(ChatWorkflow, record_id)
elif table_name == "ChatMessage":
record["_hideView"] = False # Everyone can view
record["_hideEdit"] = not self.canModify(ChatWorkflow, record.get("workflowId"))
record["_hideDelete"] = not self.canModify(ChatWorkflow, record.get("workflowId"))
elif table_name == "ChatLog":
record["_hideView"] = False # Everyone can view
record["_hideEdit"] = not self.canModify(ChatWorkflow, record.get("workflowId"))
record["_hideDelete"] = not self.canModify(ChatWorkflow, record.get("workflowId"))
elif table_name == "UserInDB":
# For users table, users can only modify their own connections
record["_hideView"] = False
record["_hideEdit"] = record_id != self.userId
record["_hideDelete"] = record_id != self.userId
# Add connection-specific permissions
if "connections" in record:
for conn in record["connections"]:
conn["_hideEdit"] = record_id != self.userId
conn["_hideDelete"] = record_id != self.userId
elif table_name == "VoiceSettings":
# For voice settings, users can only access their own settings
record["_hideView"] = False
record["_hideEdit"] = record.get("userId") != self.userId
record["_hideDelete"] = record.get("userId") != self.userId
else:
# Default access control for other tables
record["_hideView"] = False
record["_hideEdit"] = not self.canModify(model_class, record_id)
record["_hideDelete"] = not self.canModify(model_class, record_id)
return filtered_records
def canModify(self, model_class: type, recordId: Optional[int] = None) -> bool:
"""
Checks if the current user can modify (create/update/delete) records in a table.
Args:
model_class: Pydantic model class for the table
recordId: Optional record ID for specific record check
Returns:
Boolean indicating permission
"""
userPrivilege = self.privilege
# System admins can modify anything
if userPrivilege == "sysadmin":
return True
# For regular users and admins, check specific cases
if recordId is not None:
# Get the record to check ownership
records: List[Dict[str, Any]] = self.db.getRecordset(model_class, recordFilter={"id": recordId})
if not records:
return False
record = records[0]
# Special case for users table - users can modify their own connections
if model_class.__name__ == "UserInDB":
if record.get("id") == self.userId:
return True
return False
# Special case for voice settings - users can modify their own settings
if model_class.__name__ == "VoiceSettings":
if record.get("userId") == self.userId:
return True
return False
# Admins can modify anything in their mandate, if mandate is specified for a record
if userPrivilege == "admin" and record.get("mandateId","-") == self.mandateId:
return True
# Regular users can only modify their own records
if (record.get("mandateId","-") == self.mandateId and
record.get("_createdBy") == self.userId):
return True
return False
else:
# For general modification permission (e.g., create)
# Admins can create anything in their mandate
if userPrivilege == "admin":
return True
# Regular users can create in most tables
return True