1409 lines
58 KiB
Python
1409 lines
58 KiB
Python
# Copyright (c) 2025 Patrick Motsch
|
|
# All rights reserved.
|
|
"""
|
|
Interface to Trustee database.
|
|
Manages trustee organisations, roles, access, contracts, documents, and positions.
|
|
"""
|
|
|
|
import logging
|
|
import math
|
|
import uuid
|
|
from typing import Dict, Any, List, Optional, Union
|
|
|
|
from modules.connectors.connectorDbPostgre import DatabaseConnector
|
|
from modules.shared.configuration import APP_CONFIG
|
|
from modules.interfaces.interfaceRbac import getRecordsetWithRBAC
|
|
from modules.security.rbac import RbacClass
|
|
from modules.datamodels.datamodelUam import User, AccessLevel
|
|
from modules.datamodels.datamodelRbac import AccessRuleContext
|
|
from .datamodelFeatureTrustee import (
|
|
TrusteeOrganisation,
|
|
TrusteeRole,
|
|
TrusteeAccess,
|
|
TrusteeContract,
|
|
TrusteeDocument,
|
|
TrusteePosition,
|
|
TrusteePositionDocument,
|
|
)
|
|
from modules.datamodels.datamodelPagination import PaginationParams, PaginatedResult
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Singleton factory for TrusteeObjects instances per context
|
|
_trusteeInterfaces = {}
|
|
|
|
|
|
def getInterface(currentUser: User, mandateId: Optional[Union[str, uuid.UUID]] = None, featureInstanceId: Optional[str] = None) -> "TrusteeObjects":
|
|
"""Get or create a TrusteeObjects instance for the given user context.
|
|
|
|
Args:
|
|
currentUser: The authenticated user
|
|
mandateId: The mandate ID from RequestContext (X-Mandate-Id header). Required.
|
|
featureInstanceId: The feature instance ID from RequestContext (X-Feature-Instance-Id header).
|
|
"""
|
|
global _trusteeInterfaces
|
|
|
|
if not currentUser or not currentUser.id:
|
|
raise ValueError("Valid user context required")
|
|
|
|
effectiveMandateId = str(mandateId) if mandateId else None
|
|
effectiveFeatureInstanceId = str(featureInstanceId) if featureInstanceId else None
|
|
|
|
# Include featureInstanceId in cache key for proper isolation
|
|
cacheKey = f"{currentUser.id}_{effectiveMandateId}_{effectiveFeatureInstanceId}"
|
|
|
|
if cacheKey not in _trusteeInterfaces:
|
|
_trusteeInterfaces[cacheKey] = TrusteeObjects(currentUser, mandateId=effectiveMandateId, featureInstanceId=effectiveFeatureInstanceId)
|
|
else:
|
|
# Update user context if needed
|
|
_trusteeInterfaces[cacheKey].setUserContext(currentUser, mandateId=effectiveMandateId, featureInstanceId=effectiveFeatureInstanceId)
|
|
|
|
return _trusteeInterfaces[cacheKey]
|
|
|
|
|
|
class TrusteeObjects:
|
|
"""
|
|
Interface to Trustee database.
|
|
Manages trustee organisations, roles, access, contracts, documents, and positions.
|
|
"""
|
|
|
|
def __init__(self, currentUser: Optional[User] = None, mandateId: Optional[str] = None, featureInstanceId: Optional[str] = None):
|
|
"""Initializes the Trustee Interface.
|
|
|
|
Args:
|
|
currentUser: The authenticated user
|
|
mandateId: The mandate ID from RequestContext (X-Mandate-Id header)
|
|
featureInstanceId: The feature instance ID from RequestContext (X-Feature-Instance-Id header)
|
|
"""
|
|
self.currentUser = currentUser
|
|
self.userId = currentUser.id if currentUser else None
|
|
# Use mandateId from parameter (Request-Context), not from user object
|
|
self.mandateId = mandateId
|
|
self.featureInstanceId = featureInstanceId
|
|
self.rbac = None
|
|
|
|
# Initialize database
|
|
self._initializeDatabase()
|
|
|
|
# Set user context if provided
|
|
if currentUser:
|
|
self.setUserContext(currentUser, mandateId=mandateId, featureInstanceId=featureInstanceId)
|
|
|
|
def setUserContext(self, currentUser: User, mandateId: Optional[str] = None, featureInstanceId: Optional[str] = None):
|
|
"""Sets the user context for the interface.
|
|
|
|
Args:
|
|
currentUser: The authenticated user
|
|
mandateId: The mandate ID from RequestContext (X-Mandate-Id header)
|
|
featureInstanceId: The feature instance ID from RequestContext (X-Feature-Instance-Id header)
|
|
"""
|
|
if not currentUser:
|
|
logger.info("Initializing interface without user context")
|
|
return
|
|
|
|
self.currentUser = currentUser
|
|
self.userId = currentUser.id
|
|
# Use mandateId from parameter (Request-Context), not from user object
|
|
self.mandateId = mandateId
|
|
self.featureInstanceId = featureInstanceId
|
|
|
|
if not self.userId:
|
|
raise ValueError("Invalid user context: id is required")
|
|
|
|
# mandateId can be None for sysadmins performing cross-mandate operations
|
|
if not self.mandateId and not getattr(currentUser, 'isSysAdmin', False):
|
|
raise ValueError("Invalid user context: mandateId is required for non-sysadmin users")
|
|
|
|
self.userLanguage = currentUser.language
|
|
|
|
# Initialize RBAC interface
|
|
from modules.security.rootAccess import getRootDbAppConnector
|
|
dbApp = getRootDbAppConnector()
|
|
self.rbac = RbacClass(self.db, dbApp=dbApp)
|
|
|
|
# Update database context
|
|
self.db.updateContext(self.userId)
|
|
|
|
def __del__(self):
|
|
"""Cleanup method to close database connection."""
|
|
if hasattr(self, "db") and self.db is not None:
|
|
try:
|
|
self.db.close()
|
|
except Exception as e:
|
|
logger.error(f"Error closing database connection: {e}")
|
|
|
|
def _initializeDatabase(self):
|
|
"""Initializes the database connection directly."""
|
|
try:
|
|
dbHost = APP_CONFIG.get("DB_HOST", "_no_config_default_data")
|
|
dbDatabase = "poweron_trustee"
|
|
dbUser = APP_CONFIG.get("DB_USER")
|
|
dbPassword = APP_CONFIG.get("DB_PASSWORD_SECRET")
|
|
dbPort = int(APP_CONFIG.get("DB_PORT", 5432))
|
|
|
|
self.db = DatabaseConnector(
|
|
dbHost=dbHost,
|
|
dbDatabase=dbDatabase,
|
|
dbUser=dbUser,
|
|
dbPassword=dbPassword,
|
|
dbPort=dbPort,
|
|
userId=self.userId,
|
|
)
|
|
|
|
self.db.initDbSystem()
|
|
logger.info(f"Trustee database initialized successfully for user {self.userId}")
|
|
except Exception as e:
|
|
logger.error(f"Failed to initialize Trustee database: {str(e)}")
|
|
raise
|
|
|
|
def checkRbacPermission(
|
|
self,
|
|
modelClass: type,
|
|
operation: str,
|
|
recordId: Optional[str] = None
|
|
) -> bool:
|
|
"""Check RBAC permission for a specific operation on a table."""
|
|
if not self.rbac or not self.currentUser:
|
|
return False
|
|
|
|
tableName = modelClass.__name__
|
|
permissions = self.rbac.getUserPermissions(
|
|
self.currentUser,
|
|
AccessRuleContext.DATA,
|
|
tableName
|
|
)
|
|
|
|
if not permissions.view:
|
|
return False
|
|
|
|
permLevel = getattr(permissions, operation, AccessLevel.NONE)
|
|
if permLevel == AccessLevel.NONE:
|
|
return False
|
|
|
|
return True
|
|
|
|
def getRbacAccessLevel(self, modelClass: type, operation: str) -> AccessLevel:
|
|
"""Get the RBAC access level for a specific operation on a table.
|
|
|
|
Returns:
|
|
AccessLevel (ALL, GROUP, MY, NONE) - determines what filtering is needed
|
|
"""
|
|
if not self.rbac or not self.currentUser:
|
|
return AccessLevel.NONE
|
|
|
|
tableName = modelClass.__name__
|
|
permissions = self.rbac.getUserPermissions(
|
|
self.currentUser,
|
|
AccessRuleContext.DATA,
|
|
tableName
|
|
)
|
|
|
|
if not permissions.view:
|
|
return AccessLevel.NONE
|
|
|
|
return getattr(permissions, operation, AccessLevel.NONE)
|
|
|
|
# ===== Organisation CRUD =====
|
|
|
|
def createOrganisation(self, data: Dict[str, Any]) -> Optional[TrusteeOrganisation]:
|
|
"""Create a new organisation."""
|
|
if not self.checkRbacPermission(TrusteeOrganisation, "create"):
|
|
logger.warning(f"User {self.userId} lacks permission to create organisation")
|
|
return None
|
|
|
|
# Set mandateId and featureInstanceId from context for proper data isolation
|
|
data["mandateId"] = self.mandateId
|
|
if "featureInstanceId" not in data:
|
|
data["featureInstanceId"] = self.featureInstanceId
|
|
|
|
# Validate ID format (alphanumeric, hyphens, underscores, 3-50 chars)
|
|
orgId = data.get("id", "")
|
|
if not orgId or len(orgId) < 3 or len(orgId) > 50:
|
|
logger.error(f"Invalid organisation ID length: {len(orgId)}")
|
|
return None
|
|
|
|
import re
|
|
if not re.match(r'^[a-zA-Z0-9_-]+$', orgId):
|
|
logger.error(f"Invalid organisation ID format: {orgId}")
|
|
return None
|
|
|
|
createdRecord = self.db.recordCreate(TrusteeOrganisation, data)
|
|
if createdRecord and createdRecord.get("id"):
|
|
return TrusteeOrganisation(**{k: v for k, v in createdRecord.items() if not k.startswith("_")})
|
|
return None
|
|
|
|
def getOrganisation(self, orgId: str) -> Optional[TrusteeOrganisation]:
|
|
"""Get a single organisation by ID."""
|
|
records = self.db.getRecordset(TrusteeOrganisation, recordFilter={"id": orgId})
|
|
if not records:
|
|
return None
|
|
return TrusteeOrganisation(**{k: v for k, v in records[0].items() if not k.startswith("_")})
|
|
|
|
def getAllOrganisations(self, params: Optional[PaginationParams] = None) -> PaginatedResult:
|
|
"""Get all organisations with RBAC filtering.
|
|
|
|
Note: Organisations are managed at system level (by mandate).
|
|
Feature-level filtering (trustee.access) is NOT applied here because:
|
|
- Admin users with system RBAC can manage all orgs in their mandate
|
|
- trustee.access grants access to specific orgs for other users
|
|
- New organisations wouldn't be visible without an access record
|
|
"""
|
|
# Debug: Log user info and permissions
|
|
logger.debug(f"getAllOrganisations called for user {self.userId}, mandateId: {self.mandateId}")
|
|
|
|
# System RBAC filtering (filters by mandate for GROUP access level)
|
|
records = getRecordsetWithRBAC(
|
|
connector=self.db,
|
|
modelClass=TrusteeOrganisation,
|
|
currentUser=self.currentUser,
|
|
recordFilter=None,
|
|
orderBy="id"
|
|
)
|
|
logger.debug(f"getAllOrganisations: getRecordsetWithRBAC returned {len(records)} records")
|
|
|
|
# Apply pagination
|
|
totalItems = len(records)
|
|
if params:
|
|
pageSize = params.pageSize or 20
|
|
page = params.page or 1
|
|
startIdx = (page - 1) * pageSize
|
|
endIdx = startIdx + pageSize
|
|
items = records[startIdx:endIdx]
|
|
totalPages = math.ceil(totalItems / pageSize) if pageSize > 0 else 1
|
|
else:
|
|
items = records
|
|
totalPages = 1
|
|
page = 1
|
|
pageSize = totalItems
|
|
|
|
return PaginatedResult(
|
|
items=items,
|
|
totalItems=totalItems,
|
|
totalPages=totalPages
|
|
)
|
|
|
|
def updateOrganisation(self, orgId: str, data: Dict[str, Any]) -> Optional[TrusteeOrganisation]:
|
|
"""Update an organisation."""
|
|
if not self.checkRbacPermission(TrusteeOrganisation, "update"):
|
|
logger.warning(f"User {self.userId} lacks permission to update organisation")
|
|
return None
|
|
|
|
# ID cannot be changed after creation
|
|
if "id" in data and data["id"] != orgId:
|
|
logger.error("Organisation ID cannot be changed")
|
|
return None
|
|
|
|
data["id"] = orgId
|
|
updatedRecord = self.db.recordModify(TrusteeOrganisation, orgId, data)
|
|
if not updatedRecord:
|
|
return None
|
|
return TrusteeOrganisation(**{k: v for k, v in updatedRecord.items() if not k.startswith("_")})
|
|
|
|
def deleteOrganisation(self, orgId: str) -> bool:
|
|
"""Delete an organisation."""
|
|
if not self.checkRbacPermission(TrusteeOrganisation, "delete"):
|
|
logger.warning(f"User {self.userId} lacks permission to delete organisation")
|
|
return False
|
|
|
|
return self.db.recordDelete(TrusteeOrganisation, orgId)
|
|
|
|
# ===== Role CRUD =====
|
|
|
|
def createRole(self, data: Dict[str, Any]) -> Optional[TrusteeRole]:
|
|
"""Create a new role (sysadmin only)."""
|
|
if not self.checkRbacPermission(TrusteeRole, "create"):
|
|
logger.warning(f"User {self.userId} lacks permission to create role")
|
|
return None
|
|
|
|
data["mandateId"] = self.mandateId
|
|
if "featureInstanceId" not in data:
|
|
data["featureInstanceId"] = self.featureInstanceId
|
|
roleId = data.get("id", "")
|
|
|
|
if not roleId:
|
|
logger.error("Role ID is required")
|
|
return None
|
|
|
|
createdRecord = self.db.recordCreate(TrusteeRole, data)
|
|
if createdRecord and createdRecord.get("id"):
|
|
return TrusteeRole(**{k: v for k, v in createdRecord.items() if not k.startswith("_")})
|
|
return None
|
|
|
|
def getRole(self, roleId: str) -> Optional[TrusteeRole]:
|
|
"""Get a single role by ID."""
|
|
records = self.db.getRecordset(TrusteeRole, recordFilter={"id": roleId})
|
|
if not records:
|
|
return None
|
|
return TrusteeRole(**{k: v for k, v in records[0].items() if not k.startswith("_")})
|
|
|
|
def getAllRoles(self, params: Optional[PaginationParams] = None) -> PaginatedResult:
|
|
"""Get all roles with RBAC filtering.
|
|
|
|
Note: Roles are available to all users with trustee access.
|
|
They are not filtered by organisation since they define the role types.
|
|
Users with ALL access level see all roles; others need trustee.access records.
|
|
"""
|
|
records = getRecordsetWithRBAC(
|
|
connector=self.db,
|
|
modelClass=TrusteeRole,
|
|
currentUser=self.currentUser,
|
|
recordFilter=None,
|
|
orderBy="id"
|
|
)
|
|
|
|
# Users with ALL access level (from system RBAC) see all roles
|
|
# Others need at least one trustee.access record
|
|
accessLevel = self.getRbacAccessLevel(TrusteeRole, "read")
|
|
if accessLevel != AccessLevel.ALL:
|
|
userAccess = self.getAllUserAccess(self.userId)
|
|
if not userAccess:
|
|
records = [] # No trustee access at all
|
|
|
|
totalItems = len(records)
|
|
if params:
|
|
pageSize = params.pageSize or 20
|
|
page = params.page or 1
|
|
startIdx = (page - 1) * pageSize
|
|
endIdx = startIdx + pageSize
|
|
items = records[startIdx:endIdx]
|
|
totalPages = math.ceil(totalItems / pageSize) if pageSize > 0 else 1
|
|
else:
|
|
items = records
|
|
totalPages = 1
|
|
page = 1
|
|
pageSize = totalItems
|
|
|
|
return PaginatedResult(
|
|
items=items,
|
|
totalItems=totalItems,
|
|
totalPages=totalPages
|
|
)
|
|
|
|
def updateRole(self, roleId: str, data: Dict[str, Any]) -> Optional[TrusteeRole]:
|
|
"""Update a role (sysadmin only)."""
|
|
if not self.checkRbacPermission(TrusteeRole, "update"):
|
|
logger.warning(f"User {self.userId} lacks permission to update role")
|
|
return None
|
|
|
|
data["id"] = roleId
|
|
updatedRecord = self.db.recordModify(TrusteeRole, roleId, data)
|
|
if not updatedRecord:
|
|
return None
|
|
return TrusteeRole(**{k: v for k, v in updatedRecord.items() if not k.startswith("_")})
|
|
|
|
def deleteRole(self, roleId: str) -> bool:
|
|
"""Delete a role (sysadmin only, not if in use)."""
|
|
if not self.checkRbacPermission(TrusteeRole, "delete"):
|
|
logger.warning(f"User {self.userId} lacks permission to delete role")
|
|
return False
|
|
|
|
# Check if role is in use
|
|
accessRecords = self.db.getRecordset(TrusteeAccess, recordFilter={"roleId": roleId})
|
|
if accessRecords:
|
|
logger.error(f"Cannot delete role {roleId}: still in use")
|
|
return False
|
|
|
|
return self.db.recordDelete(TrusteeRole, roleId)
|
|
|
|
# ===== Access CRUD =====
|
|
|
|
def createAccess(self, data: Dict[str, Any]) -> Optional[TrusteeAccess]:
|
|
"""Create a new access record. Requires admin role for the organisation or ALL access level."""
|
|
# Check system RBAC first
|
|
if not self.checkRbacPermission(TrusteeAccess, "create"):
|
|
logger.warning(f"User {self.userId} lacks system permission to create access")
|
|
return None
|
|
|
|
organisationId = data.get("organisationId")
|
|
|
|
# Users with ALL access level bypass feature-level permission check
|
|
accessLevel = self.getRbacAccessLevel(TrusteeAccess, "create")
|
|
|
|
# Check feature-level permission - must have admin role for this organisation (unless ALL access)
|
|
if accessLevel != AccessLevel.ALL and not self.checkUserTrusteePermission(self.userId, organisationId, "admin"):
|
|
logger.warning(f"User {self.userId} lacks admin role for organisation {organisationId}")
|
|
return None
|
|
|
|
data["mandateId"] = self.mandateId
|
|
if "featureInstanceId" not in data:
|
|
data["featureInstanceId"] = self.featureInstanceId
|
|
|
|
import uuid
|
|
accessId = data.get("id") or str(uuid.uuid4())
|
|
data["id"] = accessId
|
|
|
|
createdRecord = self.db.recordCreate(TrusteeAccess, data)
|
|
if createdRecord and createdRecord.get("id"):
|
|
return TrusteeAccess(**{k: v for k, v in createdRecord.items() if not k.startswith("_")})
|
|
return None
|
|
|
|
def getAccess(self, accessId: str) -> Optional[TrusteeAccess]:
|
|
"""Get a single access record by ID."""
|
|
records = self.db.getRecordset(TrusteeAccess, recordFilter={"id": accessId})
|
|
if not records:
|
|
return None
|
|
return TrusteeAccess(**{k: v for k, v in records[0].items() if not k.startswith("_")})
|
|
|
|
def getAllAccess(self, params: Optional[PaginationParams] = None) -> PaginatedResult:
|
|
"""Get all access records with RBAC filtering + feature-level filtering.
|
|
|
|
Users with ALL access level see all access records.
|
|
Others can only see access records for organisations they have admin access to.
|
|
"""
|
|
# Step 1: System RBAC filtering
|
|
records = getRecordsetWithRBAC(
|
|
connector=self.db,
|
|
modelClass=TrusteeAccess,
|
|
currentUser=self.currentUser,
|
|
recordFilter=None,
|
|
orderBy="id"
|
|
)
|
|
|
|
# Users with ALL access level (from system RBAC) see all records
|
|
accessLevel = self.getRbacAccessLevel(TrusteeAccess, "read")
|
|
|
|
if accessLevel != AccessLevel.ALL:
|
|
# Step 2: Feature-level filtering - only see access for organisations where user is admin
|
|
userAccess = self.getAllUserAccess(self.userId)
|
|
|
|
# Get organisations where user has admin role
|
|
adminOrgs = set()
|
|
for access in userAccess:
|
|
if access.get("roleId") == "admin":
|
|
adminOrgs.add(access.get("organisationId"))
|
|
|
|
# Filter records to only show those in admin organisations
|
|
if adminOrgs:
|
|
records = [r for r in records if r.get("organisationId") in adminOrgs]
|
|
else:
|
|
records = []
|
|
|
|
totalItems = len(records)
|
|
if params:
|
|
pageSize = params.pageSize or 20
|
|
page = params.page or 1
|
|
startIdx = (page - 1) * pageSize
|
|
endIdx = startIdx + pageSize
|
|
items = records[startIdx:endIdx]
|
|
totalPages = math.ceil(totalItems / pageSize) if pageSize > 0 else 1
|
|
else:
|
|
items = records
|
|
totalPages = 1
|
|
page = 1
|
|
pageSize = totalItems
|
|
|
|
return PaginatedResult(
|
|
items=items,
|
|
totalItems=totalItems,
|
|
totalPages=totalPages
|
|
)
|
|
|
|
def getAccessByOrganisation(self, organisationId: str) -> List[TrusteeAccess]:
|
|
"""Get all access records for a specific organisation.
|
|
|
|
Requires admin role for the organisation.
|
|
"""
|
|
# Check if user has admin access for this organisation
|
|
if not self.checkUserTrusteePermission(self.userId, organisationId, "admin"):
|
|
logger.warning(f"User {self.userId} lacks admin role for organisation {organisationId}")
|
|
return []
|
|
|
|
records = getRecordsetWithRBAC(
|
|
connector=self.db,
|
|
modelClass=TrusteeAccess,
|
|
currentUser=self.currentUser,
|
|
recordFilter={"organisationId": organisationId},
|
|
orderBy="id"
|
|
)
|
|
return [TrusteeAccess(**{k: v for k, v in r.items() if not k.startswith("_")}) for r in records]
|
|
|
|
def getAccessByUser(self, userId: str) -> List[TrusteeAccess]:
|
|
"""Get all access records for a specific user.
|
|
|
|
Users with ALL access level see all access records.
|
|
Others can only see access records for organisations where they have admin role.
|
|
"""
|
|
records = getRecordsetWithRBAC(
|
|
connector=self.db,
|
|
modelClass=TrusteeAccess,
|
|
currentUser=self.currentUser,
|
|
recordFilter={"userId": userId},
|
|
orderBy="id"
|
|
)
|
|
|
|
# Users with ALL access level (from system RBAC) see all records
|
|
accessLevel = self.getRbacAccessLevel(TrusteeAccess, "read")
|
|
if accessLevel == AccessLevel.ALL:
|
|
return [TrusteeAccess(**{k: v for k, v in r.items() if not k.startswith("_")}) for r in records]
|
|
|
|
# Filter to only organisations where current user has admin role
|
|
userAccess = self.getAllUserAccess(self.userId)
|
|
adminOrgs = set()
|
|
for access in userAccess:
|
|
if access.get("roleId") == "admin":
|
|
adminOrgs.add(access.get("organisationId"))
|
|
|
|
filtered = [r for r in records if r.get("organisationId") in adminOrgs]
|
|
return [TrusteeAccess(**{k: v for k, v in r.items() if not k.startswith("_")}) for r in filtered]
|
|
|
|
def updateAccess(self, accessId: str, data: Dict[str, Any]) -> Optional[TrusteeAccess]:
|
|
"""Update an access record. Requires admin role for the organisation or ALL access level."""
|
|
# Check system RBAC first
|
|
if not self.checkRbacPermission(TrusteeAccess, "update"):
|
|
logger.warning(f"User {self.userId} lacks system permission to update access")
|
|
return None
|
|
|
|
# Get existing access to check organisation
|
|
existingRecords = self.db.getRecordset(TrusteeAccess, recordFilter={"id": accessId})
|
|
existing = existingRecords[0] if existingRecords else None
|
|
|
|
if not existing:
|
|
logger.warning(f"Access record {accessId} not found")
|
|
return None
|
|
|
|
organisationId = existing.get("organisationId")
|
|
|
|
# Users with ALL access level bypass feature-level permission check
|
|
accessLevel = self.getRbacAccessLevel(TrusteeAccess, "update")
|
|
|
|
# Check feature-level permission - must have admin role for this organisation (unless ALL access)
|
|
if accessLevel != AccessLevel.ALL and not self.checkUserTrusteePermission(self.userId, organisationId, "admin"):
|
|
logger.warning(f"User {self.userId} lacks admin role for organisation {organisationId}")
|
|
return None
|
|
|
|
data["id"] = accessId
|
|
updatedRecord = self.db.recordModify(TrusteeAccess, accessId, data)
|
|
if not updatedRecord:
|
|
return None
|
|
return TrusteeAccess(**{k: v for k, v in updatedRecord.items() if not k.startswith("_")})
|
|
|
|
def deleteAccess(self, accessId: str) -> bool:
|
|
"""Delete an access record. Requires admin role for the organisation or ALL access level."""
|
|
# Check system RBAC first
|
|
if not self.checkRbacPermission(TrusteeAccess, "delete"):
|
|
logger.warning(f"User {self.userId} lacks system permission to delete access")
|
|
return False
|
|
|
|
# Get existing access to check organisation
|
|
existingRecords = self.db.getRecordset(TrusteeAccess, recordFilter={"id": accessId})
|
|
existing = existingRecords[0] if existingRecords else None
|
|
|
|
if not existing:
|
|
logger.warning(f"Access record {accessId} not found")
|
|
return False
|
|
|
|
organisationId = existing.get("organisationId")
|
|
|
|
# Users with ALL access level bypass feature-level permission check
|
|
accessLevel = self.getRbacAccessLevel(TrusteeAccess, "delete")
|
|
|
|
# Check feature-level permission - must have admin role for this organisation (unless ALL access)
|
|
if accessLevel != AccessLevel.ALL and not self.checkUserTrusteePermission(self.userId, organisationId, "admin"):
|
|
logger.warning(f"User {self.userId} lacks admin role for organisation {organisationId}")
|
|
return False
|
|
|
|
return self.db.recordDelete(TrusteeAccess, accessId)
|
|
|
|
# ===== Contract CRUD =====
|
|
|
|
def createContract(self, data: Dict[str, Any]) -> Optional[TrusteeContract]:
|
|
"""Create a new contract."""
|
|
organisationId = data.get("organisationId")
|
|
|
|
# Check combined permission (system RBAC + feature-level)
|
|
if not self.checkCombinedPermission(TrusteeContract, "create", organisationId):
|
|
logger.warning(f"User {self.userId} lacks permission to create contract for org {organisationId}")
|
|
return None
|
|
|
|
data["mandateId"] = self.mandateId
|
|
if "featureInstanceId" not in data:
|
|
data["featureInstanceId"] = self.featureInstanceId
|
|
|
|
import uuid
|
|
contractId = data.get("id") or str(uuid.uuid4())
|
|
data["id"] = contractId
|
|
|
|
createdRecord = self.db.recordCreate(TrusteeContract, data)
|
|
if createdRecord and createdRecord.get("id"):
|
|
return TrusteeContract(**{k: v for k, v in createdRecord.items() if not k.startswith("_")})
|
|
return None
|
|
|
|
def getContract(self, contractId: str) -> Optional[TrusteeContract]:
|
|
"""Get a single contract by ID."""
|
|
records = self.db.getRecordset(TrusteeContract, recordFilter={"id": contractId})
|
|
if not records:
|
|
return None
|
|
return TrusteeContract(**{k: v for k, v in records[0].items() if not k.startswith("_")})
|
|
|
|
def getAllContracts(self, params: Optional[PaginationParams] = None) -> PaginatedResult:
|
|
"""Get all contracts with RBAC filtering + feature-level access filtering."""
|
|
# Step 1: System RBAC filtering
|
|
records = getRecordsetWithRBAC(
|
|
connector=self.db,
|
|
modelClass=TrusteeContract,
|
|
currentUser=self.currentUser,
|
|
recordFilter=None,
|
|
orderBy="id"
|
|
)
|
|
|
|
# Step 2: Feature-level filtering based on trustee.access
|
|
records = self.filterRecordsByTrusteeAccess(records, TrusteeContract)
|
|
|
|
totalItems = len(records)
|
|
if params:
|
|
pageSize = params.pageSize or 20
|
|
page = params.page or 1
|
|
startIdx = (page - 1) * pageSize
|
|
endIdx = startIdx + pageSize
|
|
items = records[startIdx:endIdx]
|
|
totalPages = math.ceil(totalItems / pageSize) if pageSize > 0 else 1
|
|
else:
|
|
items = records
|
|
totalPages = 1
|
|
page = 1
|
|
pageSize = totalItems
|
|
|
|
return PaginatedResult(
|
|
items=items,
|
|
totalItems=totalItems,
|
|
totalPages=totalPages
|
|
)
|
|
|
|
def getContractsByOrganisation(self, organisationId: str) -> List[TrusteeContract]:
|
|
"""Get all contracts for a specific organisation."""
|
|
# Step 1: System RBAC filtering
|
|
records = getRecordsetWithRBAC(
|
|
connector=self.db,
|
|
modelClass=TrusteeContract,
|
|
currentUser=self.currentUser,
|
|
recordFilter={"organisationId": organisationId},
|
|
orderBy="label"
|
|
)
|
|
|
|
# Step 2: Feature-level filtering based on trustee.access
|
|
filtered = self.filterRecordsByTrusteeAccess(records, TrusteeContract)
|
|
return [TrusteeContract(**{k: v for k, v in r.items() if not k.startswith("_")}) for r in filtered]
|
|
|
|
def updateContract(self, contractId: str, data: Dict[str, Any]) -> Optional[TrusteeContract]:
|
|
"""Update a contract (organisationId is immutable)."""
|
|
# Get existing contract to check organisation
|
|
existingRecords = self.db.getRecordset(TrusteeContract, recordFilter={"id": contractId})
|
|
existing = existingRecords[0] if existingRecords else None
|
|
|
|
if not existing:
|
|
logger.warning(f"Contract {contractId} not found")
|
|
return None
|
|
|
|
organisationId = existing.get("organisationId")
|
|
|
|
# Check combined permission (system RBAC + feature-level)
|
|
if not self.checkCombinedPermission(TrusteeContract, "update", organisationId):
|
|
logger.warning(f"User {self.userId} lacks permission to update contract in org {organisationId}")
|
|
return None
|
|
|
|
# Check if organisationId is being changed (not allowed)
|
|
if "organisationId" in data and data["organisationId"] != organisationId:
|
|
logger.error("Contract organisationId cannot be changed after creation")
|
|
return None
|
|
|
|
data["id"] = contractId
|
|
updatedRecord = self.db.recordModify(TrusteeContract, contractId, data)
|
|
if not updatedRecord:
|
|
return None
|
|
return TrusteeContract(**{k: v for k, v in updatedRecord.items() if not k.startswith("_")})
|
|
|
|
def deleteContract(self, contractId: str) -> bool:
|
|
"""Delete a contract."""
|
|
# Get existing contract to check organisation
|
|
existingRecords = self.db.getRecordset(TrusteeContract, recordFilter={"id": contractId})
|
|
existing = existingRecords[0] if existingRecords else None
|
|
|
|
if not existing:
|
|
logger.warning(f"Contract {contractId} not found")
|
|
return False
|
|
|
|
organisationId = existing.get("organisationId")
|
|
|
|
# Check combined permission (system RBAC + feature-level)
|
|
if not self.checkCombinedPermission(TrusteeContract, "delete", organisationId):
|
|
logger.warning(f"User {self.userId} lacks permission to delete contract in org {organisationId}")
|
|
return False
|
|
|
|
return self.db.recordDelete(TrusteeContract, contractId)
|
|
|
|
# ===== Document CRUD =====
|
|
|
|
def createDocument(self, data: Dict[str, Any]) -> Optional[TrusteeDocument]:
|
|
"""Create a new document."""
|
|
organisationId = data.get("organisationId")
|
|
contractId = data.get("contractId")
|
|
|
|
# Check combined permission (system RBAC + feature-level)
|
|
if not self.checkCombinedPermission(TrusteeDocument, "create", organisationId, contractId):
|
|
logger.warning(f"User {self.userId} lacks permission to create document in org {organisationId}")
|
|
return None
|
|
|
|
data["mandateId"] = self.mandateId
|
|
if "featureInstanceId" not in data:
|
|
data["featureInstanceId"] = self.featureInstanceId
|
|
|
|
import uuid
|
|
documentId = data.get("id") or str(uuid.uuid4())
|
|
data["id"] = documentId
|
|
|
|
createdRecord = self.db.recordCreate(TrusteeDocument, data)
|
|
if createdRecord and createdRecord.get("id"):
|
|
# Remove binary data and metadata from Pydantic model
|
|
cleanedRecord = {k: v for k, v in createdRecord.items() if not k.startswith("_") and k != "documentData"}
|
|
return TrusteeDocument(**cleanedRecord)
|
|
return None
|
|
|
|
def getDocument(self, documentId: str) -> Optional[TrusteeDocument]:
|
|
"""Get a single document by ID (metadata only)."""
|
|
records = self.db.getRecordset(TrusteeDocument, recordFilter={"id": documentId})
|
|
if not records:
|
|
return None
|
|
# Remove binary data and metadata from Pydantic model
|
|
cleanedRecord = {k: v for k, v in records[0].items() if not k.startswith("_") and k != "documentData"}
|
|
return TrusteeDocument(**cleanedRecord)
|
|
|
|
def getDocumentData(self, documentId: str) -> Optional[bytes]:
|
|
"""Get document binary data."""
|
|
records = self.db.getRecordset(TrusteeDocument, recordFilter={"id": documentId})
|
|
record = records[0] if records else None
|
|
if record:
|
|
return record.get("documentData")
|
|
return None
|
|
|
|
def getAllDocuments(self, params: Optional[PaginationParams] = None) -> PaginatedResult:
|
|
"""Get all documents with RBAC filtering + feature-level access filtering (metadata only)."""
|
|
# Step 1: System RBAC filtering
|
|
records = getRecordsetWithRBAC(
|
|
connector=self.db,
|
|
modelClass=TrusteeDocument,
|
|
currentUser=self.currentUser,
|
|
recordFilter=None,
|
|
orderBy="documentName"
|
|
)
|
|
|
|
# Step 2: Feature-level filtering based on trustee.access
|
|
# This applies userreport filtering (only own records)
|
|
records = self.filterRecordsByTrusteeAccess(records, TrusteeDocument)
|
|
|
|
# Remove binary data from responses
|
|
for record in records:
|
|
record.pop("documentData", None)
|
|
|
|
totalItems = len(records)
|
|
if params:
|
|
pageSize = params.pageSize or 20
|
|
page = params.page or 1
|
|
startIdx = (page - 1) * pageSize
|
|
endIdx = startIdx + pageSize
|
|
items = records[startIdx:endIdx]
|
|
totalPages = math.ceil(totalItems / pageSize) if pageSize > 0 else 1
|
|
else:
|
|
items = records
|
|
totalPages = 1
|
|
page = 1
|
|
pageSize = totalItems
|
|
|
|
return PaginatedResult(
|
|
items=items,
|
|
totalItems=totalItems,
|
|
totalPages=totalPages
|
|
)
|
|
|
|
def getDocumentsByContract(self, contractId: str) -> List[TrusteeDocument]:
|
|
"""Get all documents for a specific contract."""
|
|
# Step 1: System RBAC filtering
|
|
records = getRecordsetWithRBAC(
|
|
connector=self.db,
|
|
modelClass=TrusteeDocument,
|
|
currentUser=self.currentUser,
|
|
recordFilter={"contractId": contractId},
|
|
orderBy="documentName"
|
|
)
|
|
|
|
# Step 2: Feature-level filtering based on trustee.access
|
|
filtered = self.filterRecordsByTrusteeAccess(records, TrusteeDocument)
|
|
|
|
result = []
|
|
for record in filtered:
|
|
cleanedRecord = {k: v for k, v in record.items() if not k.startswith("_") and k != "documentData"}
|
|
result.append(TrusteeDocument(**cleanedRecord))
|
|
return result
|
|
|
|
def updateDocument(self, documentId: str, data: Dict[str, Any]) -> Optional[TrusteeDocument]:
|
|
"""Update a document."""
|
|
# Get existing document to check organisation and creator
|
|
existingRecords = self.db.getRecordset(TrusteeDocument, recordFilter={"id": documentId})
|
|
existing = existingRecords[0] if existingRecords else None
|
|
|
|
if not existing:
|
|
logger.warning(f"Document {documentId} not found")
|
|
return None
|
|
|
|
organisationId = existing.get("organisationId")
|
|
contractId = existing.get("contractId")
|
|
createdBy = existing.get("_createdBy")
|
|
|
|
# Check combined permission (system RBAC + feature-level)
|
|
# For userreport, this checks if they created the record
|
|
if not self.checkCombinedPermission(TrusteeDocument, "update", organisationId, contractId, createdBy):
|
|
logger.warning(f"User {self.userId} lacks permission to update document in org {organisationId}")
|
|
return None
|
|
|
|
data["id"] = documentId
|
|
updatedRecord = self.db.recordModify(TrusteeDocument, documentId, data)
|
|
if not updatedRecord:
|
|
return None
|
|
cleanedRecord = {k: v for k, v in updatedRecord.items() if not k.startswith("_") and k != "documentData"}
|
|
return TrusteeDocument(**cleanedRecord)
|
|
|
|
def deleteDocument(self, documentId: str) -> bool:
|
|
"""Delete a document."""
|
|
# Get existing document to check organisation and creator
|
|
existingRecords = self.db.getRecordset(TrusteeDocument, recordFilter={"id": documentId})
|
|
existing = existingRecords[0] if existingRecords else None
|
|
|
|
if not existing:
|
|
logger.warning(f"Document {documentId} not found")
|
|
return False
|
|
|
|
organisationId = existing.get("organisationId")
|
|
contractId = existing.get("contractId")
|
|
createdBy = existing.get("_createdBy")
|
|
|
|
# Check combined permission (system RBAC + feature-level)
|
|
# For userreport, this checks if they created the record
|
|
if not self.checkCombinedPermission(TrusteeDocument, "delete", organisationId, contractId, createdBy):
|
|
logger.warning(f"User {self.userId} lacks permission to delete document in org {organisationId}")
|
|
return False
|
|
|
|
return self.db.recordDelete(TrusteeDocument, documentId)
|
|
|
|
# ===== Position CRUD =====
|
|
|
|
def createPosition(self, data: Dict[str, Any]) -> Optional[TrusteePosition]:
|
|
"""Create a new position."""
|
|
organisationId = data.get("organisationId")
|
|
contractId = data.get("contractId")
|
|
|
|
# Check combined permission (system RBAC + feature-level)
|
|
if not self.checkCombinedPermission(TrusteePosition, "create", organisationId, contractId):
|
|
logger.warning(f"User {self.userId} lacks permission to create position in org {organisationId}")
|
|
return None
|
|
|
|
data["mandateId"] = self.mandateId
|
|
if "featureInstanceId" not in data:
|
|
data["featureInstanceId"] = self.featureInstanceId
|
|
|
|
# Calculate VAT amount if not provided
|
|
if "vatAmount" not in data or data.get("vatAmount") == 0:
|
|
bookingAmount = data.get("bookingAmount", 0)
|
|
vatPercentage = data.get("vatPercentage", 0)
|
|
data["vatAmount"] = bookingAmount * vatPercentage / 100
|
|
|
|
import uuid
|
|
positionId = data.get("id") or str(uuid.uuid4())
|
|
data["id"] = positionId
|
|
|
|
createdRecord = self.db.recordCreate(TrusteePosition, data)
|
|
if createdRecord and createdRecord.get("id"):
|
|
return TrusteePosition(**{k: v for k, v in createdRecord.items() if not k.startswith("_")})
|
|
return None
|
|
|
|
def getPosition(self, positionId: str) -> Optional[TrusteePosition]:
|
|
"""Get a single position by ID."""
|
|
records = self.db.getRecordset(TrusteePosition, recordFilter={"id": positionId})
|
|
if not records:
|
|
return None
|
|
return TrusteePosition(**{k: v for k, v in records[0].items() if not k.startswith("_")})
|
|
|
|
def getAllPositions(self, params: Optional[PaginationParams] = None) -> PaginatedResult:
|
|
"""Get all positions with RBAC filtering + feature-level access filtering."""
|
|
# Step 1: System RBAC filtering
|
|
records = getRecordsetWithRBAC(
|
|
connector=self.db,
|
|
modelClass=TrusteePosition,
|
|
currentUser=self.currentUser,
|
|
recordFilter=None,
|
|
orderBy="valuta"
|
|
)
|
|
|
|
# Step 2: Feature-level filtering based on trustee.access
|
|
# This applies userreport filtering (only own records)
|
|
records = self.filterRecordsByTrusteeAccess(records, TrusteePosition)
|
|
|
|
totalItems = len(records)
|
|
if params:
|
|
pageSize = params.pageSize or 20
|
|
page = params.page or 1
|
|
startIdx = (page - 1) * pageSize
|
|
endIdx = startIdx + pageSize
|
|
items = records[startIdx:endIdx]
|
|
totalPages = math.ceil(totalItems / pageSize) if pageSize > 0 else 1
|
|
else:
|
|
items = records
|
|
totalPages = 1
|
|
page = 1
|
|
pageSize = totalItems
|
|
|
|
return PaginatedResult(
|
|
items=items,
|
|
totalItems=totalItems,
|
|
totalPages=totalPages
|
|
)
|
|
|
|
def getPositionsByContract(self, contractId: str) -> List[TrusteePosition]:
|
|
"""Get all positions for a specific contract."""
|
|
# Step 1: System RBAC filtering
|
|
records = getRecordsetWithRBAC(
|
|
connector=self.db,
|
|
modelClass=TrusteePosition,
|
|
currentUser=self.currentUser,
|
|
recordFilter={"contractId": contractId},
|
|
orderBy="valuta"
|
|
)
|
|
|
|
# Step 2: Feature-level filtering based on trustee.access
|
|
filtered = self.filterRecordsByTrusteeAccess(records, TrusteePosition)
|
|
return [TrusteePosition(**{k: v for k, v in r.items() if not k.startswith("_")}) for r in filtered]
|
|
|
|
def getPositionsByOrganisation(self, organisationId: str) -> List[TrusteePosition]:
|
|
"""Get all positions for a specific organisation."""
|
|
# Step 1: System RBAC filtering
|
|
records = getRecordsetWithRBAC(
|
|
connector=self.db,
|
|
modelClass=TrusteePosition,
|
|
currentUser=self.currentUser,
|
|
recordFilter={"organisationId": organisationId},
|
|
orderBy="valuta"
|
|
)
|
|
|
|
# Step 2: Feature-level filtering based on trustee.access
|
|
filtered = self.filterRecordsByTrusteeAccess(records, TrusteePosition)
|
|
return [TrusteePosition(**{k: v for k, v in r.items() if not k.startswith("_")}) for r in filtered]
|
|
|
|
def updatePosition(self, positionId: str, data: Dict[str, Any]) -> Optional[TrusteePosition]:
|
|
"""Update a position."""
|
|
# Get existing position to check organisation and creator
|
|
existingRecords = self.db.getRecordset(TrusteePosition, recordFilter={"id": positionId})
|
|
existing = existingRecords[0] if existingRecords else None
|
|
|
|
if not existing:
|
|
logger.warning(f"Position {positionId} not found")
|
|
return None
|
|
|
|
organisationId = existing.get("organisationId")
|
|
contractId = existing.get("contractId")
|
|
createdBy = existing.get("_createdBy")
|
|
|
|
# Check combined permission (system RBAC + feature-level)
|
|
# For userreport, this checks if they created the record
|
|
if not self.checkCombinedPermission(TrusteePosition, "update", organisationId, contractId, createdBy):
|
|
logger.warning(f"User {self.userId} lacks permission to update position in org {organisationId}")
|
|
return None
|
|
|
|
data["id"] = positionId
|
|
updatedRecord = self.db.recordModify(TrusteePosition, positionId, data)
|
|
if not updatedRecord:
|
|
return None
|
|
return TrusteePosition(**{k: v for k, v in updatedRecord.items() if not k.startswith("_")})
|
|
|
|
def deletePosition(self, positionId: str) -> bool:
|
|
"""Delete a position."""
|
|
# Get existing position to check organisation and creator
|
|
existingRecords = self.db.getRecordset(TrusteePosition, recordFilter={"id": positionId})
|
|
existing = existingRecords[0] if existingRecords else None
|
|
|
|
if not existing:
|
|
logger.warning(f"Position {positionId} not found")
|
|
return False
|
|
|
|
organisationId = existing.get("organisationId")
|
|
contractId = existing.get("contractId")
|
|
createdBy = existing.get("_createdBy")
|
|
|
|
# Check combined permission (system RBAC + feature-level)
|
|
# For userreport, this checks if they created the record
|
|
if not self.checkCombinedPermission(TrusteePosition, "delete", organisationId, contractId, createdBy):
|
|
logger.warning(f"User {self.userId} lacks permission to delete position in org {organisationId}")
|
|
return False
|
|
|
|
return self.db.recordDelete(TrusteePosition, positionId)
|
|
|
|
# ===== Position-Document Link CRUD =====
|
|
|
|
def createPositionDocument(self, data: Dict[str, Any]) -> Optional[TrusteePositionDocument]:
|
|
"""Create a new position-document link."""
|
|
organisationId = data.get("organisationId")
|
|
contractId = data.get("contractId")
|
|
|
|
# Check combined permission (system RBAC + feature-level)
|
|
if not self.checkCombinedPermission(TrusteePositionDocument, "create", organisationId, contractId):
|
|
logger.warning(f"User {self.userId} lacks permission to create position-document link in org {organisationId}")
|
|
return None
|
|
|
|
data["mandateId"] = self.mandateId
|
|
if "featureInstanceId" not in data:
|
|
data["featureInstanceId"] = self.featureInstanceId
|
|
|
|
import uuid
|
|
linkId = data.get("id") or str(uuid.uuid4())
|
|
data["id"] = linkId
|
|
|
|
createdRecord = self.db.recordCreate(TrusteePositionDocument, data)
|
|
if createdRecord and createdRecord.get("id"):
|
|
return TrusteePositionDocument(**{k: v for k, v in createdRecord.items() if not k.startswith("_")})
|
|
return None
|
|
|
|
def getPositionDocument(self, linkId: str) -> Optional[TrusteePositionDocument]:
|
|
"""Get a single position-document link by ID."""
|
|
records = self.db.getRecordset(TrusteePositionDocument, recordFilter={"id": linkId})
|
|
if not records:
|
|
return None
|
|
return TrusteePositionDocument(**{k: v for k, v in records[0].items() if not k.startswith("_")})
|
|
|
|
def getAllPositionDocuments(self, params: Optional[PaginationParams] = None) -> PaginatedResult:
|
|
"""Get all position-document links with RBAC filtering + feature-level access filtering."""
|
|
# Step 1: System RBAC filtering
|
|
records = getRecordsetWithRBAC(
|
|
connector=self.db,
|
|
modelClass=TrusteePositionDocument,
|
|
currentUser=self.currentUser,
|
|
recordFilter=None,
|
|
orderBy="id"
|
|
)
|
|
|
|
# Step 2: Feature-level filtering based on trustee.access
|
|
# This applies userreport filtering (only own records)
|
|
records = self.filterRecordsByTrusteeAccess(records, TrusteePositionDocument)
|
|
|
|
totalItems = len(records)
|
|
if params:
|
|
pageSize = params.pageSize or 20
|
|
page = params.page or 1
|
|
startIdx = (page - 1) * pageSize
|
|
endIdx = startIdx + pageSize
|
|
items = records[startIdx:endIdx]
|
|
totalPages = math.ceil(totalItems / pageSize) if pageSize > 0 else 1
|
|
else:
|
|
items = records
|
|
totalPages = 1
|
|
page = 1
|
|
pageSize = totalItems
|
|
|
|
return PaginatedResult(
|
|
items=items,
|
|
totalItems=totalItems,
|
|
totalPages=totalPages
|
|
)
|
|
|
|
def getDocumentsForPosition(self, positionId: str) -> List[TrusteePositionDocument]:
|
|
"""Get all documents linked to a position."""
|
|
# Step 1: System RBAC filtering
|
|
links = getRecordsetWithRBAC(
|
|
connector=self.db,
|
|
modelClass=TrusteePositionDocument,
|
|
currentUser=self.currentUser,
|
|
recordFilter={"positionId": positionId},
|
|
orderBy="id"
|
|
)
|
|
|
|
# Step 2: Feature-level filtering based on trustee.access
|
|
filtered = self.filterRecordsByTrusteeAccess(links, TrusteePositionDocument)
|
|
return [TrusteePositionDocument(**{k: v for k, v in r.items() if not k.startswith("_")}) for r in filtered]
|
|
|
|
def getPositionsForDocument(self, documentId: str) -> List[TrusteePositionDocument]:
|
|
"""Get all positions linked to a document."""
|
|
# Step 1: System RBAC filtering
|
|
links = getRecordsetWithRBAC(
|
|
connector=self.db,
|
|
modelClass=TrusteePositionDocument,
|
|
currentUser=self.currentUser,
|
|
recordFilter={"documentId": documentId},
|
|
orderBy="id"
|
|
)
|
|
|
|
# Step 2: Feature-level filtering based on trustee.access
|
|
filtered = self.filterRecordsByTrusteeAccess(links, TrusteePositionDocument)
|
|
return [TrusteePositionDocument(**{k: v for k, v in r.items() if not k.startswith("_")}) for r in filtered]
|
|
|
|
def deletePositionDocument(self, linkId: str) -> bool:
|
|
"""Delete a position-document link."""
|
|
# Get existing link to check organisation and creator
|
|
existingRecords = self.db.getRecordset(TrusteePositionDocument, recordFilter={"id": linkId})
|
|
existing = existingRecords[0] if existingRecords else None
|
|
|
|
if not existing:
|
|
logger.warning(f"Position-document link {linkId} not found")
|
|
return False
|
|
|
|
organisationId = existing.get("organisationId")
|
|
contractId = existing.get("contractId")
|
|
createdBy = existing.get("_createdBy")
|
|
|
|
# Check combined permission (system RBAC + feature-level)
|
|
# For userreport, this checks if they created the record
|
|
if not self.checkCombinedPermission(TrusteePositionDocument, "delete", organisationId, contractId, createdBy):
|
|
logger.warning(f"User {self.userId} lacks permission to delete position-document link in org {organisationId}")
|
|
return False
|
|
|
|
return self.db.recordDelete(TrusteePositionDocument, linkId)
|
|
|
|
# ===== Trustee-specific Access Check =====
|
|
|
|
def getUserAccessForOrganisation(self, userId: str, organisationId: str) -> List[Dict[str, Any]]:
|
|
"""Get all access records for a user in a specific organisation."""
|
|
return self.db.getRecordset(
|
|
TrusteeAccess,
|
|
{"userId": userId, "organisationId": organisationId}
|
|
)
|
|
|
|
def getAllUserAccess(self, userId: str) -> List[Dict[str, Any]]:
|
|
"""Get all access records for a user across all organisations."""
|
|
return self.db.getRecordset(TrusteeAccess, {"userId": userId})
|
|
|
|
def getUserTrusteeRoles(self, userId: str, organisationId: str, contractId: Optional[str] = None) -> List[str]:
|
|
"""
|
|
Get all trustee roles a user has for an organisation (and optionally contract).
|
|
|
|
Args:
|
|
userId: User ID to check
|
|
organisationId: Organisation ID
|
|
contractId: Optional contract ID for contract-specific access
|
|
|
|
Returns:
|
|
List of role IDs the user has
|
|
"""
|
|
accessRecords = self.getUserAccessForOrganisation(userId, organisationId)
|
|
roles = []
|
|
|
|
for access in accessRecords:
|
|
accessContractId = access.get("contractId")
|
|
roleId = access.get("roleId")
|
|
|
|
# If access has no contractId, it grants access to all contracts
|
|
if accessContractId is None:
|
|
if roleId not in roles:
|
|
roles.append(roleId)
|
|
# If checking for specific contract, match it
|
|
elif contractId and accessContractId == contractId:
|
|
if roleId not in roles:
|
|
roles.append(roleId)
|
|
# If no specific contract requested but access is contract-specific,
|
|
# only include if we're not filtering by contract
|
|
elif contractId is None:
|
|
# User has contract-specific access, but we're not filtering
|
|
# They can see the record but only for their specific contracts
|
|
if roleId not in roles:
|
|
roles.append(roleId)
|
|
|
|
return roles
|
|
|
|
def checkUserTrusteePermission(
|
|
self,
|
|
userId: str,
|
|
organisationId: str,
|
|
requiredRole: str,
|
|
contractId: Optional[str] = None
|
|
) -> bool:
|
|
"""
|
|
Check if a user has a specific role for an organisation (and optionally contract).
|
|
|
|
Args:
|
|
userId: User ID to check
|
|
organisationId: Organisation ID
|
|
requiredRole: Required role (userreport, admin, operate)
|
|
contractId: Optional contract ID for contract-specific access
|
|
|
|
Returns:
|
|
True if user has the required role
|
|
"""
|
|
roles = self.getUserTrusteeRoles(userId, organisationId, contractId)
|
|
return requiredRole in roles
|
|
|
|
def checkCombinedPermission(
|
|
self,
|
|
modelClass: type,
|
|
operation: str,
|
|
organisationId: Optional[str] = None,
|
|
contractId: Optional[str] = None,
|
|
recordCreatedBy: Optional[str] = None
|
|
) -> bool:
|
|
"""
|
|
Check combined system RBAC + feature-level RBAC permissions.
|
|
|
|
Args:
|
|
modelClass: The model class (e.g., TrusteeContract)
|
|
operation: Operation type (read, create, update, delete)
|
|
organisationId: Optional organisation ID for feature-level check
|
|
contractId: Optional contract ID for contract-level filtering
|
|
recordCreatedBy: Optional creator ID for userreport role checks
|
|
|
|
Returns:
|
|
True if user has permission
|
|
"""
|
|
# Step 1: Check system RBAC and get access level
|
|
accessLevel = self.getRbacAccessLevel(modelClass, operation)
|
|
if accessLevel == AccessLevel.NONE:
|
|
return False
|
|
|
|
# Users with ALL access level bypass feature-level checks
|
|
if accessLevel == AccessLevel.ALL:
|
|
return True
|
|
|
|
# Step 2: If no organisationId, system RBAC is sufficient (for listing all)
|
|
if organisationId is None:
|
|
return True
|
|
|
|
# Step 3: Check feature-level RBAC via trustee.access
|
|
roles = self.getUserTrusteeRoles(self.userId, organisationId, contractId)
|
|
|
|
if not roles:
|
|
# No trustee access for this organisation
|
|
return False
|
|
|
|
# Check role-based permissions
|
|
# admin role: full CRUD for organisation
|
|
if "admin" in roles:
|
|
return True
|
|
|
|
# operate role: CRUD for contracts, documents, positions
|
|
if "operate" in roles:
|
|
if modelClass in (TrusteeContract, TrusteeDocument, TrusteePosition, TrusteePositionDocument):
|
|
return True
|
|
# operate can read organisations
|
|
if modelClass == TrusteeOrganisation and operation == "read":
|
|
return True
|
|
|
|
# userreport role: CRUD own records for documents/positions
|
|
if "userreport" in roles:
|
|
if modelClass in (TrusteeDocument, TrusteePosition, TrusteePositionDocument):
|
|
# For create, always allowed
|
|
if operation == "create":
|
|
return True
|
|
# For read/update/delete, must be own record
|
|
if recordCreatedBy == self.userId:
|
|
return True
|
|
# userreport can read organisations and contracts
|
|
if modelClass in (TrusteeOrganisation, TrusteeContract) and operation == "read":
|
|
return True
|
|
|
|
return False
|
|
|
|
def filterRecordsByTrusteeAccess(
|
|
self,
|
|
records: List[Dict[str, Any]],
|
|
modelClass: type
|
|
) -> List[Dict[str, Any]]:
|
|
"""
|
|
Filter records based on user's trustee.access permissions.
|
|
|
|
Args:
|
|
records: List of records to filter
|
|
modelClass: The model class for determining filter logic
|
|
|
|
Returns:
|
|
Filtered list of records
|
|
"""
|
|
if not records:
|
|
return records
|
|
|
|
# Users with ALL access level bypass feature-level filtering
|
|
accessLevel = self.getRbacAccessLevel(modelClass, "read")
|
|
if accessLevel == AccessLevel.ALL:
|
|
return records
|
|
|
|
# Get all user's access records
|
|
userAccess = self.getAllUserAccess(self.userId)
|
|
|
|
if not userAccess:
|
|
# No trustee access at all - return empty for trustee tables
|
|
return []
|
|
|
|
# Build lookup for user's accessible organisations and contracts
|
|
accessByOrg = {} # {orgId: {'roles': [...], 'contracts': [...]}}
|
|
hasFullOrgAccess = {} # {orgId: True} if user has access without contractId restriction
|
|
|
|
for access in userAccess:
|
|
orgId = access.get("organisationId")
|
|
roleId = access.get("roleId")
|
|
contractIdAccess = access.get("contractId")
|
|
|
|
if orgId not in accessByOrg:
|
|
accessByOrg[orgId] = {"roles": [], "contracts": []}
|
|
|
|
if roleId not in accessByOrg[orgId]["roles"]:
|
|
accessByOrg[orgId]["roles"].append(roleId)
|
|
|
|
if contractIdAccess is None:
|
|
hasFullOrgAccess[orgId] = True
|
|
elif contractIdAccess not in accessByOrg[orgId]["contracts"]:
|
|
accessByOrg[orgId]["contracts"].append(contractIdAccess)
|
|
|
|
filteredRecords = []
|
|
|
|
for record in records:
|
|
orgId = record.get("organisationId")
|
|
contractId = record.get("contractId")
|
|
createdBy = record.get("_createdBy")
|
|
|
|
# For Organisation model, filter by accessible organisations
|
|
if modelClass == TrusteeOrganisation:
|
|
recordOrgId = record.get("id")
|
|
if recordOrgId in accessByOrg:
|
|
filteredRecords.append(record)
|
|
continue
|
|
|
|
# Check if user has access to this organisation
|
|
if orgId not in accessByOrg:
|
|
continue
|
|
|
|
roles = accessByOrg[orgId]["roles"]
|
|
|
|
# admin has full access to organisation
|
|
if "admin" in roles:
|
|
# Check contract filtering
|
|
if contractId:
|
|
if hasFullOrgAccess.get(orgId) or contractId in accessByOrg[orgId]["contracts"]:
|
|
filteredRecords.append(record)
|
|
else:
|
|
filteredRecords.append(record)
|
|
continue
|
|
|
|
# operate has full access to organisation data
|
|
if "operate" in roles:
|
|
if contractId:
|
|
if hasFullOrgAccess.get(orgId) or contractId in accessByOrg[orgId]["contracts"]:
|
|
filteredRecords.append(record)
|
|
else:
|
|
filteredRecords.append(record)
|
|
continue
|
|
|
|
# userreport can only see own records for documents/positions
|
|
if "userreport" in roles:
|
|
if modelClass in (TrusteeDocument, TrusteePosition, TrusteePositionDocument):
|
|
# Must be own record
|
|
if createdBy == self.userId:
|
|
# Also check contract access
|
|
if contractId:
|
|
if hasFullOrgAccess.get(orgId) or contractId in accessByOrg[orgId]["contracts"]:
|
|
filteredRecords.append(record)
|
|
else:
|
|
filteredRecords.append(record)
|
|
elif modelClass == TrusteeContract:
|
|
# Can read contracts in their organisation
|
|
if contractId:
|
|
if hasFullOrgAccess.get(orgId) or contractId in accessByOrg[orgId]["contracts"]:
|
|
filteredRecords.append(record)
|
|
else:
|
|
# For contracts table, check if record's id is in accessible contracts
|
|
recordContractId = record.get("id")
|
|
if hasFullOrgAccess.get(orgId) or recordContractId in accessByOrg[orgId]["contracts"]:
|
|
filteredRecords.append(record)
|
|
continue
|
|
|
|
return filteredRecords
|