gateway/modules/interfaces/serviceAppAccess.py
2025-06-12 01:11:33 +02:00

259 lines
No EOL
11 KiB
Python

"""
Access control for the Application.
"""
import logging
from typing import Dict, Any, List, Optional
from datetime import datetime
from modules.interfaces.serviceAppModel import UserPrivilege, Session, User
# Configure logger
logger = logging.getLogger(__name__)
class AppAccess:
"""
Access control class for Application interface.
Handles user access management and permission checks.
"""
def __init__(self, currentUser: User, db):
"""Initialize with user context."""
self.currentUser = currentUser
self.userId = currentUser.id
self.mandateId = currentUser.mandateId
self.privilege = currentUser.privilege
if not self.mandateId or not self.userId:
raise ValueError("Invalid user context: mandateId and userId are required")
self.db = db
def uam(self, table: str, recordset: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Unified user access management function that filters data based on user privileges
and adds access control attributes.
Args:
table: Name of the table
recordset: Recordset to filter based on access rules
Returns:
Filtered recordset with access control attributes
"""
filtered_records = []
# Only SYSADMIN can see mandates
if table == "mandates":
if self.privilege == UserPrivilege.SYSADMIN:
filtered_records = recordset
else:
filtered_records = []
# Special handling for users table
elif table == "users":
if self.privilege == UserPrivilege.SYSADMIN:
# SysAdmin sees all users
filtered_records = recordset
elif self.privilege == UserPrivilege.ADMIN:
# Admin sees all users in their mandate
filtered_records = [r for r in recordset if r.get("mandateId","-") == self.mandateId]
else:
# Regular users only see themselves
filtered_records = [r for r in recordset if r.get("id") == self.userId]
# Special handling for connections table
elif table == "connections":
if self.privilege == UserPrivilege.SYSADMIN:
# SysAdmin sees all connections
filtered_records = recordset
elif self.privilege == UserPrivilege.ADMIN:
# Admin sees connections for users in their mandate
users: List[Dict[str, Any]] = self.db.getRecordset("users", recordFilter={"mandateId": self.mandateId})
user_ids: List[str] = [str(u["id"]) for u in users]
filtered_records = [r for r in recordset if r.get("userId") in user_ids]
else:
# Regular users only see their own connections
filtered_records = [r for r in recordset if r.get("userId") == self.userId]
# System admins see all other records
elif self.privilege == UserPrivilege.SYSADMIN:
filtered_records = recordset
# For other records, admins see records in their mandate
elif self.privilege == UserPrivilege.ADMIN:
filtered_records = [r for r in recordset if r.get("mandateId","-") == self.mandateId]
# Regular users only see records they own within their mandate
else:
filtered_records = [r for r in recordset
if r.get("mandateId","-") == self.mandateId and r.get("createdBy") == self.userId]
# Add access control attributes to each record
for record in filtered_records:
record_id = record.get("id")
# Set access control flags based on user permissions
if table == "mandates":
record["_hideView"] = False # SYSADMIN can view
record["_hideEdit"] = not self.canModify("mandates", record_id)
record["_hideDelete"] = not self.canModify("mandates", record_id)
elif table == "users":
record["_hideView"] = False # Everyone can view users they have access to
# SysAdmin can edit/delete any user
if self.privilege == UserPrivilege.SYSADMIN:
record["_hideEdit"] = False
record["_hideDelete"] = False
# Admin can edit/delete users in their mandate
elif self.privilege == UserPrivilege.ADMIN:
record["_hideEdit"] = record.get("mandateId","-") != self.mandateId
record["_hideDelete"] = record.get("mandateId","-") != self.mandateId
# Regular users can only edit themselves
else:
record["_hideEdit"] = record.get("id") != self.userId
record["_hideDelete"] = True # Regular users cannot delete users
elif table == "connections":
# Everyone can view connections they have access to
record["_hideView"] = False
# SysAdmin can edit/delete any connection
if self.privilege == UserPrivilege.SYSADMIN:
record["_hideEdit"] = False
record["_hideDelete"] = False
# Admin can edit/delete connections for users in their mandate
elif self.privilege == UserPrivilege.ADMIN:
users: List[Dict[str, Any]] = self.db.getRecordset("users", recordFilter={"mandateId": self.mandateId})
user_ids: List[str] = [str(u["id"]) for u in users]
record["_hideEdit"] = record.get("userId") not in user_ids
record["_hideDelete"] = record.get("userId") not in user_ids
# Regular users can only edit/delete their own connections
else:
record["_hideEdit"] = record.get("userId") != self.userId
record["_hideDelete"] = record.get("userId") != self.userId
elif table == "sessions":
# Only show sessions for the current user or if admin
if self.privilege in [UserPrivilege.SYSADMIN, UserPrivilege.ADMIN]:
record["_hideView"] = False
else:
record["_hideView"] = record.get("userId") != self.userId
record["_hideEdit"] = True # Sessions can't be edited
record["_hideDelete"] = not self.canModify("sessions", record_id)
elif table == "auth_events":
# Only show auth events for the current user or if admin
if self.privilege in [UserPrivilege.SYSADMIN, UserPrivilege.ADMIN]:
record["_hideView"] = False
else:
record["_hideView"] = record.get("userId") != self.userId
record["_hideEdit"] = True # Auth events can't be edited
record["_hideDelete"] = not self.canModify("auth_events", record_id)
else:
# Default access control for other tables
record["_hideView"] = False
record["_hideEdit"] = not self.canModify(table, record_id)
record["_hideDelete"] = not self.canModify(table, record_id)
return filtered_records
def canModify(self, table: str, recordId: Optional[str] = None) -> bool:
"""
Checks if the current user can modify (create/update/delete) records in a table.
Args:
table: Name of the table
recordId: Optional record ID for specific record check
Returns:
Boolean indicating permission
"""
# For mandates, only SYSADMIN can modify
if table == "mandates":
return self.privilege == UserPrivilege.SYSADMIN
# System admins can modify anything else
if self.privilege == UserPrivilege.SYSADMIN:
return True
# Check specific record permissions
if recordId is not None:
# Get the record to check ownership
records: List[Dict[str, Any]] = self.db.getRecordset(table, recordFilter={"id": str(recordId)})
if not records:
return False
record = records[0]
# Special handling for connections
if table == "connections":
# Admin can modify connections for users in their mandate
if self.privilege == UserPrivilege.ADMIN:
users: List[Dict[str, Any]] = self.db.getRecordset("users", recordFilter={"mandateId": self.mandateId})
user_ids: List[str] = [str(u["id"]) for u in users]
return record.get("userId") in user_ids
# Users can only modify their own connections
return record.get("userId") == self.userId
# Admins can modify anything in their mandate
if self.privilege == UserPrivilege.ADMIN and record.get("mandateId","-") == self.mandateId:
return True
# Users can only modify their own records
if (record.get("mandateId","-") == self.mandateId and
record.get("createdBy") == self.userId):
return True
return False
else:
# For general table modify permission (e.g., create)
# Admins can create anything in their mandate
if self.privilege == UserPrivilege.ADMIN:
return True
# Regular users can create most entities
return True
def validateSession(self, sessionId: str) -> bool:
"""
Validates a user session.
Args:
sessionId: ID of the session to validate
Returns:
Boolean indicating if session is valid
"""
try:
# Get session
sessions: List[Dict[str, Any]] = self.db.getRecordset("sessions", recordFilter={"id": sessionId})
if not sessions:
return False
session = sessions[0]
# Check if session is expired
if datetime.now() > session["expiresAt"]:
return False
# Check if user has permission to access this session
if session["userId"] != self.userId and self.privilege not in [UserPrivilege.SYSADMIN, UserPrivilege.ADMIN]:
return False
# Update last activity
self.db.recordModify("sessions", sessionId, {
"lastActivity": datetime.now()
})
return True
except Exception as e:
logger.error(f"Error validating session: {str(e)}")
return False
def canAccessAuthEvents(self, userId: str) -> bool:
"""
Checks if the current user can access auth events for a specific user.
Args:
userId: ID of the user whose auth events to check
Returns:
Boolean indicating permission
"""
# System admins and admins can access all auth events
if self.privilege in [UserPrivilege.SYSADMIN, UserPrivilege.ADMIN]:
return True
# Regular users can only access their own auth events
return userId == self.userId