""" Access control for the Application. """ import logging from typing import Dict, Any, List, Optional from datetime import datetime from modules.datamodels.datamodelUam import UserPrivilege, User, UserInDB, Mandate from modules.datamodels.datamodelSecurity import AuthEvent from modules.shared.timezoneUtils import get_utc_now # Configure logger logger = logging.getLogger(__name__) class AppAccess: """ Access control class for Application interface. Handles user access management and permission checks. """ def __init__(self, currentUser: User, db): """Initialize with user context.""" self.currentUser = currentUser self.userId = currentUser.id self.mandateId = currentUser.mandateId self.privilege = currentUser.privilege if not self.mandateId or not self.userId: raise ValueError("Invalid user context: mandateId and userId are required") self.db = db def uam(self, model_class: type, recordset: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """ Unified user access management function that filters data based on user privileges and adds access control attributes. Args: model_class: Pydantic model class for the table recordset: Recordset to filter based on access rules Returns: Filtered recordset with access control attributes """ filtered_records = [] table_name = model_class.__name__ # Only SYSADMIN can see mandates if table_name == "Mandate": if self.privilege == UserPrivilege.SYSADMIN: filtered_records = recordset else: filtered_records = [] # Special handling for users table elif table_name == "UserInDB": if self.privilege == UserPrivilege.SYSADMIN: # SysAdmin sees all users filtered_records = recordset elif self.privilege == UserPrivilege.ADMIN: # Admin sees all users in their mandate filtered_records = [r for r in recordset if r.get("mandateId","-") == self.mandateId] else: # Regular users only see themselves filtered_records = [r for r in recordset if r.get("id") == self.userId] # Special handling for connections table elif table_name == "UserConnection": if self.privilege == UserPrivilege.SYSADMIN: # SysAdmin sees all connections filtered_records = recordset elif self.privilege == UserPrivilege.ADMIN: # Admin sees connections for users in their mandate users: List[Dict[str, Any]] = self.db.getRecordset(UserInDB, recordFilter={"mandateId": self.mandateId}) user_ids: List[str] = [str(u["id"]) for u in users] filtered_records = [r for r in recordset if r.get("userId") in user_ids] else: # Regular users only see their own connections filtered_records = [r for r in recordset if r.get("userId") == self.userId] # Special handling for data neutralization config table elif table_name == "DataNeutraliserConfig": if self.privilege == UserPrivilege.SYSADMIN: # SysAdmin sees all configs filtered_records = recordset elif self.privilege == UserPrivilege.ADMIN: # Admin sees configs in their mandate filtered_records = [r for r in recordset if r.get("mandateId","-") == self.mandateId] else: # Regular users only see their own configs filtered_records = [r for r in recordset if r.get("mandateId","-") == self.mandateId and r.get("userId") == self.userId] # Special handling for data neutralizer attributes table elif table_name == "DataNeutralizerAttributes": if self.privilege == UserPrivilege.SYSADMIN: # SysAdmin sees all attributes filtered_records = recordset elif self.privilege == UserPrivilege.ADMIN: # Admin sees attributes in their mandate filtered_records = [r for r in recordset if r.get("mandateId","-") == self.mandateId] else: # Regular users only see their own attributes filtered_records = [r for r in recordset if r.get("mandateId","-") == self.mandateId and r.get("userId") == self.userId] # System admins see all other records elif self.privilege == UserPrivilege.SYSADMIN: filtered_records = recordset # For other records, admins see records in their mandate elif self.privilege == UserPrivilege.ADMIN: filtered_records = [r for r in recordset if r.get("mandateId","-") == self.mandateId] # Regular users only see records they own within their mandate else: filtered_records = [r for r in recordset if r.get("mandateId","-") == self.mandateId and r.get("createdBy") == self.userId] # Add access control attributes to each record for record in filtered_records: record_id = record.get("id") # Set access control flags based on user permissions if table_name == "Mandate": record["_hideView"] = False # SYSADMIN can view record["_hideEdit"] = not self.canModify(Mandate, record_id) record["_hideDelete"] = not self.canModify(Mandate, record_id) elif table_name == "UserInDB": record["_hideView"] = False # Everyone can view users they have access to # SysAdmin can edit/delete any user if self.privilege == UserPrivilege.SYSADMIN: record["_hideEdit"] = False record["_hideDelete"] = False # Admin can edit/delete users in their mandate elif self.privilege == UserPrivilege.ADMIN: record["_hideEdit"] = record.get("mandateId","-") != self.mandateId record["_hideDelete"] = record.get("mandateId","-") != self.mandateId # Regular users can only edit themselves else: record["_hideEdit"] = record.get("id") != self.userId record["_hideDelete"] = True # Regular users cannot delete users elif table_name == "UserConnection": # Everyone can view connections they have access to record["_hideView"] = False # SysAdmin can edit/delete any connection if self.privilege == UserPrivilege.SYSADMIN: record["_hideEdit"] = False record["_hideDelete"] = False # Admin can edit/delete connections for users in their mandate elif self.privilege == UserPrivilege.ADMIN: users: List[Dict[str, Any]] = self.db.getRecordset(UserInDB, recordFilter={"mandateId": self.mandateId}) user_ids: List[str] = [str(u["id"]) for u in users] record["_hideEdit"] = record.get("userId") not in user_ids record["_hideDelete"] = record.get("userId") not in user_ids # Regular users can only edit/delete their own connections else: record["_hideEdit"] = record.get("userId") != self.userId record["_hideDelete"] = record.get("userId") != self.userId elif table_name == "DataNeutraliserConfig": # Everyone can view configs they have access to record["_hideView"] = False # SysAdmin can edit/delete any config if self.privilege == UserPrivilege.SYSADMIN: record["_hideEdit"] = False record["_hideDelete"] = False # Admin can edit/delete configs in their mandate elif self.privilege == UserPrivilege.ADMIN: record["_hideEdit"] = record.get("mandateId","-") != self.mandateId record["_hideDelete"] = record.get("mandateId","-") != self.mandateId # Regular users can only edit/delete their own configs else: record["_hideEdit"] = record.get("userId") != self.userId record["_hideDelete"] = record.get("userId") != self.userId elif table_name == "DataNeutralizerAttributes": # Everyone can view attributes they have access to record["_hideView"] = False # SysAdmin can edit/delete any attributes if self.privilege == UserPrivilege.SYSADMIN: record["_hideEdit"] = False record["_hideDelete"] = False # Admin can edit/delete attributes in their mandate elif self.privilege == UserPrivilege.ADMIN: record["_hideEdit"] = record.get("mandateId","-") != self.mandateId record["_hideDelete"] = record.get("mandateId","-") != self.mandateId # Regular users can only edit/delete their own attributes else: record["_hideEdit"] = record.get("userId") != self.userId record["_hideDelete"] = record.get("userId") != self.userId elif table_name == "AuthEvent": # Only show auth events for the current user or if admin if self.privilege in [UserPrivilege.SYSADMIN, UserPrivilege.ADMIN]: record["_hideView"] = False else: record["_hideView"] = record.get("userId") != self.userId record["_hideEdit"] = True # Auth events can't be edited record["_hideDelete"] = not self.canModify(AuthEvent, record_id) else: # Default access control for other tables record["_hideView"] = False record["_hideEdit"] = not self.canModify(model_class, record_id) record["_hideDelete"] = not self.canModify(model_class, record_id) return filtered_records def canModify(self, model_class: type, recordId: Optional[str] = None) -> bool: """ Checks if the current user can modify (create/update/delete) records in a table. Args: model_class: Pydantic model class for the table recordId: Optional record ID for specific record check Returns: Boolean indicating permission """ table_name = model_class.__name__ # For mandates, only SYSADMIN can modify if table_name == "Mandate": return self.privilege == UserPrivilege.SYSADMIN # System admins can modify anything else if self.privilege == UserPrivilege.SYSADMIN: return True # Check specific record permissions if recordId is not None: # Get the record to check ownership records: List[Dict[str, Any]] = self.db.getRecordset(model_class, recordFilter={"id": str(recordId)}) if not records: return False record = records[0] # Special handling for connections if table_name == "UserConnection": # Admin can modify connections for users in their mandate if self.privilege == UserPrivilege.ADMIN: users: List[Dict[str, Any]] = self.db.getRecordset(UserInDB, recordFilter={"mandateId": self.mandateId}) user_ids: List[str] = [str(u["id"]) for u in users] return record.get("userId") in user_ids # Users can only modify their own connections return record.get("userId") == self.userId # Admins can modify anything in their mandate if self.privilege == UserPrivilege.ADMIN and record.get("mandateId","-") == self.mandateId: return True # Users can only modify their own records if (record.get("mandateId","-") == self.mandateId and record.get("createdBy") == self.userId): return True return False else: # For general table modify permission (e.g., create) # Admins can create anything in their mandate if self.privilege == UserPrivilege.ADMIN: return True # Regular users can create most entities return True