gateway/modules/features/trustee/interfaceFeatureTrustee.py
2026-04-26 08:31:35 +02:00

1586 lines
67 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Interface to Trustee database.
Manages trustee organisations, roles, access, contracts, documents, and positions.
"""
import logging
import math
import uuid
from datetime import datetime, timezone
from typing import Dict, Any, List, Optional, Union
from pydantic import ValidationError
from modules.connectors.connectorDbPostgre import DatabaseConnector
from modules.shared.configuration import APP_CONFIG
from modules.shared.dbRegistry import registerDatabase
from modules.interfaces.interfaceRbac import getRecordsetWithRBAC, getRecordsetPaginatedWithRBAC, getDistinctColumnValuesWithRBAC
from modules.security.rbac import RbacClass
from modules.datamodels.datamodelUam import User, AccessLevel
from modules.datamodels.datamodelRbac import AccessRuleContext
from .datamodelFeatureTrustee import (
TrusteeOrganisation,
TrusteeRole,
TrusteeAccess,
TrusteeContract,
TrusteeDocument,
TrusteePosition,
)
from modules.datamodels.datamodelPagination import PaginationParams, PaginatedResult
logger = logging.getLogger(__name__)
trusteeDatabase = "poweron_trustee"
registerDatabase(trusteeDatabase)
# Singleton factory for TrusteeObjects instances per context
_trusteeInterfaces = {}
def _toSafeFloat(value: Any, defaultValue: float = 0.0) -> float:
"""Convert mixed numeric inputs (str/number) to float safely."""
if value is None or value == "":
return defaultValue
if isinstance(value, (int, float)):
return float(value)
try:
textValue = str(value).strip().replace("'", "").replace(" ", "")
if "," in textValue and "." not in textValue:
textValue = textValue.replace(",", ".")
return float(textValue)
except Exception:
return defaultValue
def _normaliseIsoDate(value: Any) -> Optional[str]:
"""Normalise date-like input to ISO date format YYYY-MM-DD."""
if value is None or value == "":
return None
if isinstance(value, (int, float)):
try:
return datetime.fromtimestamp(float(value), tz=timezone.utc).date().isoformat()
except Exception:
return None
textValue = str(value).strip()
if not textValue:
return None
# Try common explicit formats first (incl. Swiss/European notation).
for formatValue in (
"%Y-%m-%d",
"%d.%m.%Y",
"%d/%m/%Y",
"%d-%m-%Y",
"%Y/%m/%d",
"%Y.%m.%d",
):
try:
return datetime.strptime(textValue, formatValue).date().isoformat()
except Exception:
continue
# Try ISO datetime variants.
try:
return datetime.fromisoformat(textValue.replace("Z", "+00:00")).date().isoformat()
except Exception:
return None
def _normaliseTimestamp(value: Any, fallbackIsoDate: Optional[str] = None) -> Optional[float]:
"""Normalise timestamp input to unix seconds (float)."""
if value is None or value == "":
if fallbackIsoDate:
try:
fallbackDate = datetime.strptime(fallbackIsoDate, "%Y-%m-%d").replace(tzinfo=timezone.utc)
return float(fallbackDate.timestamp())
except Exception:
return None
return None
if isinstance(value, (int, float)):
return float(value)
textValue = str(value).strip()
if not textValue:
return None
numericTimestamp = _toSafeFloat(textValue, defaultValue=float("nan"))
if not math.isnan(numericTimestamp):
return float(numericTimestamp)
# Accept date-only input and normalise to midnight UTC timestamp.
isoDate = _normaliseIsoDate(textValue)
if isoDate:
try:
parsedDate = datetime.strptime(isoDate, "%Y-%m-%d").replace(tzinfo=timezone.utc)
return float(parsedDate.timestamp())
except Exception:
return None
return None
def _sanitisePositionPayload(data: Dict[str, Any]) -> Dict[str, Any]:
"""Failsafe normalisation for TrusteePosition payloads before DB writes."""
safeData = dict(data or {})
isoValuta = _normaliseIsoDate(safeData.get("valuta"))
safeData["valuta"] = isoValuta
safeData["transactionDateTime"] = _normaliseTimestamp(
safeData.get("transactionDateTime"),
fallbackIsoDate=isoValuta,
)
safeData["bookingAmount"] = _toSafeFloat(safeData.get("bookingAmount"), defaultValue=0.0)
safeData["originalAmount"] = _toSafeFloat(
safeData.get("originalAmount"),
defaultValue=safeData["bookingAmount"],
)
safeData["vatPercentage"] = _toSafeFloat(safeData.get("vatPercentage"), defaultValue=0.0)
safeData["vatAmount"] = _toSafeFloat(safeData.get("vatAmount"), defaultValue=0.0)
bookingCurrency = (safeData.get("bookingCurrency") or "CHF")
originalCurrency = (safeData.get("originalCurrency") or bookingCurrency)
safeData["bookingCurrency"] = str(bookingCurrency).upper()
safeData["originalCurrency"] = str(originalCurrency).upper()
if "dueDate" in safeData and safeData["dueDate"]:
safeData["dueDate"] = _normaliseIsoDate(safeData["dueDate"])
_VALID_DOC_TYPES = {"invoice", "expense_receipt", "bank_document", "contract", "unknown"}
docType = safeData.get("documentType")
if docType:
docType = str(docType).strip().lower()
safeData["documentType"] = docType if docType in _VALID_DOC_TYPES else None
else:
safeData["documentType"] = None
for requiredStrField in ("company", "desc", "tags"):
val = safeData.get(requiredStrField)
safeData[requiredStrField] = str(val).strip() if val is not None else ""
for strField in ("payeeIban", "payeeName", "payeeBic", "paymentReference"):
val = safeData.get(strField)
if val:
safeData[strField] = str(val).strip() or None
else:
safeData[strField] = None
return safeData
def getInterface(currentUser: User, mandateId: Optional[Union[str, uuid.UUID]] = None, featureInstanceId: Optional[str] = None) -> "TrusteeObjects":
"""Get or create a TrusteeObjects instance for the given user context.
Args:
currentUser: The authenticated user
mandateId: The mandate ID from RequestContext (X-Mandate-Id header). Required.
featureInstanceId: The feature instance ID from RequestContext (X-Feature-Instance-Id header).
"""
global _trusteeInterfaces
if not currentUser or not currentUser.id:
raise ValueError("Valid user context required")
effectiveMandateId = str(mandateId) if mandateId else None
effectiveFeatureInstanceId = str(featureInstanceId) if featureInstanceId else None
# Include featureInstanceId in cache key for proper isolation
cacheKey = f"{currentUser.id}_{effectiveMandateId}_{effectiveFeatureInstanceId}"
if cacheKey not in _trusteeInterfaces:
_trusteeInterfaces[cacheKey] = TrusteeObjects(currentUser, mandateId=effectiveMandateId, featureInstanceId=effectiveFeatureInstanceId)
else:
# Update user context if needed
_trusteeInterfaces[cacheKey].setUserContext(currentUser, mandateId=effectiveMandateId, featureInstanceId=effectiveFeatureInstanceId)
return _trusteeInterfaces[cacheKey]
class TrusteeObjects:
"""
Interface to Trustee database.
Manages trustee organisations, roles, access, contracts, documents, and positions.
"""
# Feature code for RBAC objectKey construction
# Used to build: data.feature.trustee.{TableName}
FEATURE_CODE = "trustee"
def __init__(self, currentUser: Optional[User] = None, mandateId: Optional[str] = None, featureInstanceId: Optional[str] = None):
"""Initializes the Trustee Interface.
Args:
currentUser: The authenticated user
mandateId: The mandate ID from RequestContext (X-Mandate-Id header)
featureInstanceId: The feature instance ID from RequestContext (X-Feature-Instance-Id header)
"""
self.currentUser = currentUser
self.userId = currentUser.id if currentUser else None
# Use mandateId from parameter (Request-Context), not from user object
self.mandateId = mandateId
self.featureInstanceId = featureInstanceId
self.rbac = None
# Initialize database
self._initializeDatabase()
# Set user context if provided
if currentUser:
self.setUserContext(currentUser, mandateId=mandateId, featureInstanceId=featureInstanceId)
def setUserContext(self, currentUser: User, mandateId: Optional[str] = None, featureInstanceId: Optional[str] = None):
"""Sets the user context for the interface.
Args:
currentUser: The authenticated user
mandateId: The mandate ID from RequestContext (X-Mandate-Id header)
featureInstanceId: The feature instance ID from RequestContext (X-Feature-Instance-Id header)
"""
if not currentUser:
logger.info("Initializing interface without user context")
return
self.currentUser = currentUser
self.userId = currentUser.id
# Use mandateId from parameter (Request-Context), not from user object
self.mandateId = mandateId
self.featureInstanceId = featureInstanceId
if not self.userId:
raise ValueError("Invalid user context: id is required")
# Note: mandateId is ALWAYS optional here - it comes from Request-Context, not from User.
# Users are NOT assigned to mandates by design - they get mandate context from the request.
# sysAdmin users can additionally perform cross-mandate operations.
# Without mandateId, operations will be filtered to accessible mandates via RBAC.
self.userLanguage = currentUser.language
# Initialize RBAC interface
from modules.security.rootAccess import getRootDbAppConnector
dbApp = getRootDbAppConnector()
self.rbac = RbacClass(self.db, dbApp=dbApp)
# Update database context
self.db.updateContext(self.userId)
def __del__(self):
"""Cleanup method to close database connection."""
if hasattr(self, "db") and self.db is not None:
try:
self.db.close()
except Exception as e:
logger.error(f"Error closing database connection: {e}")
def _initializeDatabase(self):
"""Initializes the database connection directly."""
try:
dbHost = APP_CONFIG.get("DB_HOST", "_no_config_default_data")
dbDatabase = trusteeDatabase
dbUser = APP_CONFIG.get("DB_USER")
dbPassword = APP_CONFIG.get("DB_PASSWORD_SECRET")
dbPort = int(APP_CONFIG.get("DB_PORT", 5432))
self.db = DatabaseConnector(
dbHost=dbHost,
dbDatabase=dbDatabase,
dbUser=dbUser,
dbPassword=dbPassword,
dbPort=dbPort,
userId=self.userId,
)
logger.info(f"Trustee database initialized successfully for user {self.userId}")
except Exception as e:
logger.error(f"Failed to initialize Trustee database: {str(e)}")
raise
def checkRbacPermission(
self,
modelClass: type,
operation: str,
recordId: Optional[str] = None
) -> bool:
"""Check RBAC permission for a specific operation on a table."""
if not self.rbac or not self.currentUser:
return False
tableName = modelClass.__name__
from modules.interfaces.interfaceRbac import buildDataObjectKey
objectKey = buildDataObjectKey(tableName, featureCode=self.featureCode if hasattr(self, 'featureCode') else None)
permissions = self.rbac.getUserPermissions(
self.currentUser,
AccessRuleContext.DATA,
objectKey,
mandateId=self.mandateId,
featureInstanceId=self.featureInstanceId
)
if not permissions.view:
return False
permLevel = getattr(permissions, operation, AccessLevel.NONE)
if permLevel == AccessLevel.NONE:
return False
return True
def getRbacAccessLevel(self, modelClass: type, operation: str) -> AccessLevel:
"""Get the RBAC access level for a specific operation on a table.
Returns:
AccessLevel (ALL, GROUP, MY, NONE) - determines what filtering is needed
"""
if not self.rbac or not self.currentUser:
return AccessLevel.NONE
tableName = modelClass.__name__
from modules.interfaces.interfaceRbac import buildDataObjectKey
objectKey = buildDataObjectKey(tableName, featureCode=self.featureCode if hasattr(self, 'featureCode') else None)
permissions = self.rbac.getUserPermissions(
self.currentUser,
AccessRuleContext.DATA,
objectKey,
mandateId=self.mandateId,
featureInstanceId=self.featureInstanceId
)
if not permissions.view:
return AccessLevel.NONE
return getattr(permissions, operation, AccessLevel.NONE)
# ===== Pagination Helper Functions =====
def _applyFilters(self, records: List[Dict[str, Any]], params: Optional[PaginationParams]) -> List[Dict[str, Any]]:
"""
Apply filter criteria to records.
Supports:
- General search: params.filters["search"] - searches across all text fields
- Field-specific filters: params.filters
- Simple: {"status": "running"} - equals match
- With operator: {"status": {"operator": "equals", "value": "running"}}
- Operators: equals, contains, gt, gte, lt, lte, in, notIn, startsWith, endsWith
Args:
records: List of record dictionaries to filter
params: PaginationParams with filters (search is inside filters)
Returns:
Filtered list of records
"""
if not params or not records:
return records
# Get filters safely (may be None)
filters = getattr(params, 'filters', None)
if not filters:
return records
filtered = records
# Handle general search across text fields (search is inside filters)
searchTerm = filters.get("search") if isinstance(filters, dict) else None
if searchTerm:
searchTerm = str(searchTerm).lower()
if searchTerm:
searchFiltered = []
for record in filtered:
found = False
for key, value in record.items():
if isinstance(value, str) and searchTerm in value.lower():
found = True
break
elif isinstance(value, (int, float)) and searchTerm in str(value):
found = True
break
if found:
searchFiltered.append(record)
filtered = searchFiltered
# Handle field-specific filters
if filters:
for fieldName, filterValue in filters.items():
if fieldName == "search":
continue # Already handled above
fieldFiltered = []
for record in filtered:
if fieldName not in record:
continue
recordValue = record.get(fieldName)
# Handle simple value (equals operator)
if not isinstance(filterValue, dict):
if str(recordValue).lower() == str(filterValue).lower():
fieldFiltered.append(record)
continue
# Handle filter with operator
operator = filterValue.get("operator", "equals")
filterVal = filterValue.get("value")
matches = False
if operator in ["equals", "eq"]:
matches = str(recordValue).lower() == str(filterVal).lower()
elif operator == "contains":
recordStr = str(recordValue).lower() if recordValue is not None else ""
filterStr = str(filterVal).lower() if filterVal is not None else ""
matches = filterStr in recordStr
elif operator == "startsWith":
recordStr = str(recordValue).lower() if recordValue is not None else ""
filterStr = str(filterVal).lower() if filterVal is not None else ""
matches = recordStr.startswith(filterStr)
elif operator == "endsWith":
recordStr = str(recordValue).lower() if recordValue is not None else ""
filterStr = str(filterVal).lower() if filterVal is not None else ""
matches = recordStr.endswith(filterStr)
elif operator == "gt":
try:
recordNum = float(recordValue) if recordValue is not None else float('-inf')
filterNum = float(filterVal) if filterVal is not None else float('-inf')
matches = recordNum > filterNum
except (ValueError, TypeError):
matches = False
elif operator == "gte":
try:
recordNum = float(recordValue) if recordValue is not None else float('-inf')
filterNum = float(filterVal) if filterVal is not None else float('-inf')
matches = recordNum >= filterNum
except (ValueError, TypeError):
matches = False
elif operator == "lt":
try:
recordNum = float(recordValue) if recordValue is not None else float('inf')
filterNum = float(filterVal) if filterVal is not None else float('inf')
matches = recordNum < filterNum
except (ValueError, TypeError):
matches = False
elif operator == "lte":
try:
recordNum = float(recordValue) if recordValue is not None else float('inf')
filterNum = float(filterVal) if filterVal is not None else float('inf')
matches = recordNum <= filterNum
except (ValueError, TypeError):
matches = False
elif operator == "in":
if isinstance(filterVal, list):
matches = recordValue in filterVal
else:
matches = False
elif operator == "notIn":
if isinstance(filterVal, list):
matches = recordValue not in filterVal
else:
matches = False
if matches:
fieldFiltered.append(record)
filtered = fieldFiltered
return filtered
def _applySorting(self, records: List[Dict[str, Any]], params: Optional[PaginationParams]) -> List[Dict[str, Any]]:
"""Apply multi-level sorting to records using stable sort."""
if not params:
return records
# Get sort safely (may be None or empty list)
sortFields = getattr(params, 'sort', None)
if not sortFields:
return records
sortedRecords = list(records)
# Sort from least significant to most significant field (reverse order)
# Python's sort is stable, so this creates proper multi-level sorting
for sortField in reversed(sortFields):
# Handle both dict and object formats
if isinstance(sortField, dict):
fieldName = sortField.get("field")
direction = sortField.get("direction", "asc")
else:
fieldName = getattr(sortField, "field", None)
direction = getattr(sortField, "direction", "asc")
if not fieldName:
continue
isDesc = (direction == "desc")
def makeSortKey(fName):
def sortKey(record):
value = record.get(fName)
# Handle None values - place them at the end for both directions
if value is None:
return (1, "") # sorts after (0, ...)
else:
if isinstance(value, (int, float)):
return (0, value)
elif isinstance(value, str):
return (0, value.lower())
elif isinstance(value, bool):
return (0, value)
else:
return (0, str(value))
return sortKey
sortedRecords.sort(key=makeSortKey(fieldName), reverse=isDesc)
return sortedRecords
# ===== Organisation CRUD =====
def createOrganisation(self, data: Dict[str, Any]) -> Optional[TrusteeOrganisation]:
"""Create a new organisation."""
if not self.checkRbacPermission(TrusteeOrganisation, "create"):
logger.warning(f"User {self.userId} lacks permission to create organisation")
return None
# Set mandateId and featureInstanceId from context for proper data isolation
data["mandateId"] = self.mandateId
if "featureInstanceId" not in data:
data["featureInstanceId"] = self.featureInstanceId
# Validate ID format (alphanumeric, hyphens, underscores, 3-50 chars)
orgId = data.get("id", "")
if not orgId or len(orgId) < 3 or len(orgId) > 50:
logger.error(f"Invalid organisation ID length: {len(orgId)}")
return None
import re
if not re.match(r'^[a-zA-Z0-9_-]+$', orgId):
logger.error(f"Invalid organisation ID format: {orgId}")
return None
createdRecord = self.db.recordCreate(TrusteeOrganisation, data)
if createdRecord and createdRecord.get("id"):
return TrusteeOrganisation(**{k: v for k, v in createdRecord.items() if not k.startswith("_")})
return None
def getOrganisation(self, orgId: str) -> Optional[TrusteeOrganisation]:
"""Get a single organisation by ID."""
records = self.db.getRecordset(TrusteeOrganisation, recordFilter={"id": orgId})
if not records:
return None
return TrusteeOrganisation(**{k: v for k, v in records[0].items() if not k.startswith("_")})
def getAllOrganisations(self, params: Optional[PaginationParams] = None) -> Union[List[Dict], PaginatedResult]:
"""Get all organisations with RBAC filtering and optional DB-level pagination.
Note: Organisations are managed at system level (by mandate).
Feature-level filtering (trustee.access) is NOT applied here because:
- Admin users with system RBAC can manage all orgs in their mandate
- trustee.access grants access to specific orgs for other users
- New organisations wouldn't be visible without an access record
"""
logger.debug(f"getAllOrganisations called for user {self.userId}, mandateId: {self.mandateId}")
return getRecordsetPaginatedWithRBAC(
connector=self.db,
modelClass=TrusteeOrganisation,
currentUser=self.currentUser,
pagination=params,
recordFilter=None,
mandateId=self.mandateId,
featureInstanceId=self.featureInstanceId,
featureCode=self.FEATURE_CODE
)
def updateOrganisation(self, orgId: str, data: Dict[str, Any]) -> Optional[TrusteeOrganisation]:
"""Update an organisation."""
if not self.checkRbacPermission(TrusteeOrganisation, "update"):
logger.warning(f"User {self.userId} lacks permission to update organisation")
return None
# ID cannot be changed after creation
if "id" in data and data["id"] != orgId:
logger.error("Organisation ID cannot be changed")
return None
data["id"] = orgId
updatedRecord = self.db.recordModify(TrusteeOrganisation, orgId, data)
if not updatedRecord:
return None
return TrusteeOrganisation(**{k: v for k, v in updatedRecord.items() if not k.startswith("_")})
def deleteOrganisation(self, orgId: str) -> bool:
"""Delete an organisation."""
if not self.checkRbacPermission(TrusteeOrganisation, "delete"):
logger.warning(f"User {self.userId} lacks permission to delete organisation")
return False
return self.db.recordDelete(TrusteeOrganisation, orgId)
# ===== Role CRUD =====
def createRole(self, data: Dict[str, Any]) -> Optional[TrusteeRole]:
"""Create a new role (sysadmin only)."""
if not self.checkRbacPermission(TrusteeRole, "create"):
logger.warning(f"User {self.userId} lacks permission to create role")
return None
data["mandateId"] = self.mandateId
if "featureInstanceId" not in data:
data["featureInstanceId"] = self.featureInstanceId
roleId = data.get("id", "")
if not roleId:
logger.error("Role ID is required")
return None
createdRecord = self.db.recordCreate(TrusteeRole, data)
if createdRecord and createdRecord.get("id"):
return TrusteeRole(**{k: v for k, v in createdRecord.items() if not k.startswith("_")})
return None
def getRole(self, roleId: str) -> Optional[TrusteeRole]:
"""Get a single role by ID."""
records = self.db.getRecordset(TrusteeRole, recordFilter={"id": roleId})
if not records:
return None
return TrusteeRole(**{k: v for k, v in records[0].items() if not k.startswith("_")})
def getAllRoles(self, params: Optional[PaginationParams] = None) -> Union[List[Dict], PaginatedResult]:
"""Get all roles with RBAC filtering and optional DB-level pagination.
Note: Roles are available to all users with trustee access.
They are not filtered by organisation since they define the role types.
Users with ALL access level see all roles; others need trustee.access records.
NOTE(post-filter): Feature-level access check runs after the paginated query.
When pagination is active the totals may overcount if the user lacks any
trustee.access record (in that case the entire result is emptied).
"""
result = getRecordsetPaginatedWithRBAC(
connector=self.db,
modelClass=TrusteeRole,
currentUser=self.currentUser,
pagination=params,
recordFilter=None,
mandateId=self.mandateId,
featureInstanceId=self.featureInstanceId,
featureCode=self.FEATURE_CODE
)
accessLevel = self.getRbacAccessLevel(TrusteeRole, "read")
if accessLevel != AccessLevel.ALL:
userAccess = self.getAllUserAccess(self.userId)
if not userAccess:
if isinstance(result, PaginatedResult):
result.items = []
result.totalItems = 0
result.totalPages = 0
return result
return []
return result
def updateRole(self, roleId: str, data: Dict[str, Any]) -> Optional[TrusteeRole]:
"""Update a role (sysadmin only)."""
if not self.checkRbacPermission(TrusteeRole, "update"):
logger.warning(f"User {self.userId} lacks permission to update role")
return None
data["id"] = roleId
updatedRecord = self.db.recordModify(TrusteeRole, roleId, data)
if not updatedRecord:
return None
return TrusteeRole(**{k: v for k, v in updatedRecord.items() if not k.startswith("_")})
def deleteRole(self, roleId: str) -> bool:
"""Delete a role (sysadmin only, not if in use)."""
if not self.checkRbacPermission(TrusteeRole, "delete"):
logger.warning(f"User {self.userId} lacks permission to delete role")
return False
# Check if role is in use
accessRecords = self.db.getRecordset(TrusteeAccess, recordFilter={"roleId": roleId})
if accessRecords:
logger.error(f"Cannot delete role {roleId}: still in use")
return False
return self.db.recordDelete(TrusteeRole, roleId)
# ===== Access CRUD =====
def createAccess(self, data: Dict[str, Any]) -> Optional[TrusteeAccess]:
"""Create a new access record. Requires admin role for the organisation or ALL access level."""
# Check system RBAC first
if not self.checkRbacPermission(TrusteeAccess, "create"):
logger.warning(f"User {self.userId} lacks system permission to create access")
return None
organisationId = data.get("organisationId")
# Users with ALL access level bypass feature-level permission check
accessLevel = self.getRbacAccessLevel(TrusteeAccess, "create")
# Check feature-level permission - must have admin role for this organisation (unless ALL access)
if accessLevel != AccessLevel.ALL and not self.checkUserTrusteePermission(self.userId, organisationId, "admin"):
logger.warning(f"User {self.userId} lacks admin role for organisation {organisationId}")
return None
data["mandateId"] = self.mandateId
if "featureInstanceId" not in data:
data["featureInstanceId"] = self.featureInstanceId
import uuid
accessId = data.get("id") or str(uuid.uuid4())
data["id"] = accessId
createdRecord = self.db.recordCreate(TrusteeAccess, data)
if createdRecord and createdRecord.get("id"):
return TrusteeAccess(**{k: v for k, v in createdRecord.items() if not k.startswith("_")})
return None
def getAccess(self, accessId: str) -> Optional[TrusteeAccess]:
"""Get a single access record by ID."""
records = self.db.getRecordset(TrusteeAccess, recordFilter={"id": accessId})
if not records:
return None
return TrusteeAccess(**{k: v for k, v in records[0].items() if not k.startswith("_")})
def getAllAccess(self, params: Optional[PaginationParams] = None) -> Union[List[Dict], PaginatedResult]:
"""Get all access records with RBAC filtering + feature-level filtering.
Users with ALL access level see all access records.
Others can only see access records for organisations they have admin access to.
NOTE(post-filter): Feature-level admin-org filtering runs after the paginated
query, so totals may overcount when the user has restricted org access.
"""
result = getRecordsetPaginatedWithRBAC(
connector=self.db,
modelClass=TrusteeAccess,
currentUser=self.currentUser,
pagination=params,
recordFilter=None,
mandateId=self.mandateId,
featureInstanceId=self.featureInstanceId,
featureCode=self.FEATURE_CODE
)
accessLevel = self.getRbacAccessLevel(TrusteeAccess, "read")
if accessLevel != AccessLevel.ALL:
userAccess = self.getAllUserAccess(self.userId)
adminOrgs = set()
for access in userAccess:
if access.get("roleId") == "admin":
adminOrgs.add(access.get("organisationId"))
if isinstance(result, PaginatedResult):
if adminOrgs:
result.items = [r for r in result.items if r.get("organisationId") in adminOrgs]
else:
result.items = []
return result
if adminOrgs:
result = [r for r in result if r.get("organisationId") in adminOrgs]
else:
result = []
return result
def getAccessByOrganisation(self, organisationId: str, pagination: Optional[PaginationParams] = None) -> Union[List[TrusteeAccess], PaginatedResult]:
"""Get all access records for a specific organisation.
Requires admin role for the organisation.
"""
if not self.checkUserTrusteePermission(self.userId, organisationId, "admin"):
logger.warning(f"User {self.userId} lacks admin role for organisation {organisationId}")
return []
result = getRecordsetPaginatedWithRBAC(
connector=self.db,
modelClass=TrusteeAccess,
currentUser=self.currentUser,
pagination=pagination,
recordFilter={"organisationId": organisationId},
mandateId=self.mandateId,
featureInstanceId=self.featureInstanceId,
featureCode=self.FEATURE_CODE
)
if isinstance(result, PaginatedResult):
result.items = [TrusteeAccess(**{k: v for k, v in r.items() if not k.startswith("_")}) for r in result.items]
return result
return [TrusteeAccess(**{k: v for k, v in r.items() if not k.startswith("_")}) for r in result]
def getAccessByUser(self, userId: str, pagination: Optional[PaginationParams] = None) -> Union[List[TrusteeAccess], PaginatedResult]:
"""Get all access records for a specific user.
Users with ALL access level see all access records.
Others can only see access records for organisations where they have admin role.
NOTE(post-filter): Admin-org filtering runs after the paginated query,
so totals may overcount when the user has restricted org access.
"""
result = getRecordsetPaginatedWithRBAC(
connector=self.db,
modelClass=TrusteeAccess,
currentUser=self.currentUser,
pagination=pagination,
recordFilter={"userId": userId},
mandateId=self.mandateId,
featureInstanceId=self.featureInstanceId,
featureCode=self.FEATURE_CODE
)
accessLevel = self.getRbacAccessLevel(TrusteeAccess, "read")
if accessLevel != AccessLevel.ALL:
userAccess = self.getAllUserAccess(self.userId)
adminOrgs = set()
for access in userAccess:
if access.get("roleId") == "admin":
adminOrgs.add(access.get("organisationId"))
if isinstance(result, PaginatedResult):
result.items = [r for r in result.items if r.get("organisationId") in adminOrgs]
result.items = [TrusteeAccess(**{k: v for k, v in r.items() if not k.startswith("_")}) for r in result.items]
return result
result = [r for r in result if r.get("organisationId") in adminOrgs]
if isinstance(result, PaginatedResult):
result.items = [TrusteeAccess(**{k: v for k, v in r.items() if not k.startswith("_")}) for r in result.items]
return result
return [TrusteeAccess(**{k: v for k, v in r.items() if not k.startswith("_")}) for r in result]
def updateAccess(self, accessId: str, data: Dict[str, Any]) -> Optional[TrusteeAccess]:
"""Update an access record. Requires admin role for the organisation or ALL access level."""
# Check system RBAC first
if not self.checkRbacPermission(TrusteeAccess, "update"):
logger.warning(f"User {self.userId} lacks system permission to update access")
return None
# Get existing access to check organisation
existingRecords = self.db.getRecordset(TrusteeAccess, recordFilter={"id": accessId})
existing = existingRecords[0] if existingRecords else None
if not existing:
logger.warning(f"Access record {accessId} not found")
return None
organisationId = existing.get("organisationId")
# Users with ALL access level bypass feature-level permission check
accessLevel = self.getRbacAccessLevel(TrusteeAccess, "update")
# Check feature-level permission - must have admin role for this organisation (unless ALL access)
if accessLevel != AccessLevel.ALL and not self.checkUserTrusteePermission(self.userId, organisationId, "admin"):
logger.warning(f"User {self.userId} lacks admin role for organisation {organisationId}")
return None
data["id"] = accessId
updatedRecord = self.db.recordModify(TrusteeAccess, accessId, data)
if not updatedRecord:
return None
return TrusteeAccess(**{k: v for k, v in updatedRecord.items() if not k.startswith("_")})
def deleteAccess(self, accessId: str) -> bool:
"""Delete an access record. Requires admin role for the organisation or ALL access level."""
# Check system RBAC first
if not self.checkRbacPermission(TrusteeAccess, "delete"):
logger.warning(f"User {self.userId} lacks system permission to delete access")
return False
# Get existing access to check organisation
existingRecords = self.db.getRecordset(TrusteeAccess, recordFilter={"id": accessId})
existing = existingRecords[0] if existingRecords else None
if not existing:
logger.warning(f"Access record {accessId} not found")
return False
organisationId = existing.get("organisationId")
# Users with ALL access level bypass feature-level permission check
accessLevel = self.getRbacAccessLevel(TrusteeAccess, "delete")
# Check feature-level permission - must have admin role for this organisation (unless ALL access)
if accessLevel != AccessLevel.ALL and not self.checkUserTrusteePermission(self.userId, organisationId, "admin"):
logger.warning(f"User {self.userId} lacks admin role for organisation {organisationId}")
return False
return self.db.recordDelete(TrusteeAccess, accessId)
# ===== Contract CRUD =====
def createContract(self, data: Dict[str, Any]) -> Optional[TrusteeContract]:
"""Create a new contract."""
organisationId = data.get("organisationId")
# Check combined permission (system RBAC + feature-level)
if not self.checkCombinedPermission(TrusteeContract, "create", organisationId):
logger.warning(f"User {self.userId} lacks permission to create contract for org {organisationId}")
return None
data["mandateId"] = self.mandateId
if "featureInstanceId" not in data:
data["featureInstanceId"] = self.featureInstanceId
import uuid
contractId = data.get("id") or str(uuid.uuid4())
data["id"] = contractId
createdRecord = self.db.recordCreate(TrusteeContract, data)
if createdRecord and createdRecord.get("id"):
return TrusteeContract(**{k: v for k, v in createdRecord.items() if not k.startswith("_")})
return None
def getContract(self, contractId: str) -> Optional[TrusteeContract]:
"""Get a single contract by ID."""
records = self.db.getRecordset(TrusteeContract, recordFilter={"id": contractId})
if not records:
return None
return TrusteeContract(**{k: v for k, v in records[0].items() if not k.startswith("_")})
def getAllContracts(self, params: Optional[PaginationParams] = None) -> Union[List[Dict], PaginatedResult]:
"""Get all contracts with RBAC filtering and optional DB-level pagination."""
return getRecordsetPaginatedWithRBAC(
connector=self.db,
modelClass=TrusteeContract,
currentUser=self.currentUser,
pagination=params,
recordFilter=None,
mandateId=self.mandateId,
featureInstanceId=self.featureInstanceId,
featureCode=self.FEATURE_CODE
)
def getContractsByOrganisation(self, organisationId: str, pagination: Optional[PaginationParams] = None) -> Union[List[TrusteeContract], PaginatedResult]:
"""Get all contracts for a specific organisation."""
result = getRecordsetPaginatedWithRBAC(
connector=self.db,
modelClass=TrusteeContract,
currentUser=self.currentUser,
pagination=pagination,
recordFilter={"organisationId": organisationId},
mandateId=self.mandateId,
featureInstanceId=self.featureInstanceId,
featureCode=self.FEATURE_CODE
)
if isinstance(result, PaginatedResult):
result.items = [TrusteeContract(**{k: v for k, v in r.items() if not k.startswith("_")}) for r in result.items]
return result
return [TrusteeContract(**{k: v for k, v in r.items() if not k.startswith("_")}) for r in result]
def updateContract(self, contractId: str, data: Dict[str, Any]) -> Optional[TrusteeContract]:
"""Update a contract (organisationId is immutable)."""
# Get existing contract to check organisation
existingRecords = self.db.getRecordset(TrusteeContract, recordFilter={"id": contractId})
existing = existingRecords[0] if existingRecords else None
if not existing:
logger.warning(f"Contract {contractId} not found")
return None
organisationId = existing.get("organisationId")
# Check combined permission (system RBAC + feature-level)
if not self.checkCombinedPermission(TrusteeContract, "update", organisationId):
logger.warning(f"User {self.userId} lacks permission to update contract in org {organisationId}")
return None
# Check if organisationId is being changed (not allowed)
if "organisationId" in data and data["organisationId"] != organisationId:
logger.error("Contract organisationId cannot be changed after creation")
return None
data["id"] = contractId
updatedRecord = self.db.recordModify(TrusteeContract, contractId, data)
if not updatedRecord:
return None
return TrusteeContract(**{k: v for k, v in updatedRecord.items() if not k.startswith("_")})
def deleteContract(self, contractId: str) -> bool:
"""Delete a contract."""
# Get existing contract to check organisation
existingRecords = self.db.getRecordset(TrusteeContract, recordFilter={"id": contractId})
existing = existingRecords[0] if existingRecords else None
if not existing:
logger.warning(f"Contract {contractId} not found")
return False
organisationId = existing.get("organisationId")
# Check combined permission (system RBAC + feature-level)
if not self.checkCombinedPermission(TrusteeContract, "delete", organisationId):
logger.warning(f"User {self.userId} lacks permission to delete contract in org {organisationId}")
return False
return self.db.recordDelete(TrusteeContract, contractId)
# ===== Document CRUD =====
def createDocument(self, data: Dict[str, Any]) -> Optional[TrusteeDocument]:
"""Create a new document.
Note: organisationId and contractId removed - feature instance IS the organisation.
Permission is checked via system RBAC (feature-level access).
"""
# Check system RBAC permission
if not self.checkCombinedPermission(TrusteeDocument, "create"):
logger.warning(f"User {self.userId} lacks permission to create document")
return None
# Auto-set context fields
data["mandateId"] = self.mandateId
data["featureInstanceId"] = self.featureInstanceId
import uuid
documentId = data.get("id") or str(uuid.uuid4())
data["id"] = documentId
createdRecord = self.db.recordCreate(TrusteeDocument, data)
if createdRecord and createdRecord.get("id"):
# Remove metadata from Pydantic model
cleanedRecord = {k: v for k, v in createdRecord.items() if not k.startswith("_")}
return TrusteeDocument(**cleanedRecord)
return None
def getDocument(self, documentId: str) -> Optional[TrusteeDocument]:
"""Get a single document by ID (metadata only)."""
records = self.db.getRecordset(TrusteeDocument, recordFilter={"id": documentId})
if not records:
return None
# Remove binary data and metadata from Pydantic model
cleanedRecord = {k: v for k, v in records[0].items() if not k.startswith("_") and k != "documentData"}
return TrusteeDocument(**cleanedRecord)
def getDocumentData(self, documentId: str) -> Optional[bytes]:
"""Get document binary data via fileId reference to central Files table."""
records = self.db.getRecordset(TrusteeDocument, recordFilter={"id": documentId})
record = records[0] if records else None
if not record:
return None
# New model: fileId references central Files table
fileId = record.get("fileId")
if fileId:
from modules.interfaces.interfaceDbManagement import getInterface as getDbInterface
dbInterface = getDbInterface(self.currentUser, mandateId=self.mandateId, featureInstanceId=self.featureInstanceId)
fileData = dbInterface.getFileData(fileId)
if fileData:
return fileData
logger.warning(f"File data not found for fileId {fileId}")
return None
# Legacy fallback: documentData was stored directly (for migration)
return record.get("documentData")
def getAllDocuments(self, params: Optional[PaginationParams] = None) -> Union[List[Dict], PaginatedResult]:
"""Get all documents with RBAC filtering and optional DB-level pagination (metadata only).
Filtering, sorting, and pagination are handled at the SQL level by
getRecordsetPaginatedWithRBAC. Binary documentData is stripped from
the returned items.
"""
result = getRecordsetPaginatedWithRBAC(
connector=self.db,
modelClass=TrusteeDocument,
currentUser=self.currentUser,
pagination=params,
recordFilter=None,
mandateId=self.mandateId,
featureInstanceId=self.featureInstanceId,
featureCode=self.FEATURE_CODE
)
def _cleanDocumentRecords(records):
cleaned = []
for r in records:
labelCols = {k: v for k, v in r.items() if k.endswith("Label")}
filteredFields = {k: v for k, v in r.items() if not k.startswith("_") and k != "documentData"}
doc = TrusteeDocument(**filteredFields)
d = doc.model_dump()
d.update(labelCols)
cleaned.append(d)
return cleaned
if isinstance(result, PaginatedResult):
result.items = _cleanDocumentRecords(result.items)
return result
return _cleanDocumentRecords(result)
def getDocumentsByContract(self, contractId: str, pagination: Optional[PaginationParams] = None) -> Union[List[TrusteeDocument], PaginatedResult]:
"""Get all documents for a specific contract."""
result = getRecordsetPaginatedWithRBAC(
connector=self.db,
modelClass=TrusteeDocument,
currentUser=self.currentUser,
pagination=pagination,
recordFilter={"contractId": contractId},
mandateId=self.mandateId,
featureInstanceId=self.featureInstanceId,
featureCode=self.FEATURE_CODE
)
def _cleanDocumentRecords(records):
cleaned = []
for r in records:
labelCols = {k: v for k, v in r.items() if k.endswith("Label")}
filteredFields = {k: v for k, v in r.items() if not k.startswith("_") and k != "documentData"}
doc = TrusteeDocument(**filteredFields)
d = doc.model_dump()
d.update(labelCols)
cleaned.append(d)
return cleaned
if isinstance(result, PaginatedResult):
result.items = _cleanDocumentRecords(result.items)
return result
return _cleanDocumentRecords(result)
def updateDocument(self, documentId: str, data: Dict[str, Any]) -> Optional[TrusteeDocument]:
"""Update a document.
Note: organisationId and contractId removed - feature instance IS the organisation.
"""
# Get existing document to check creator
existingRecords = self.db.getRecordset(TrusteeDocument, recordFilter={"id": documentId})
existing = existingRecords[0] if existingRecords else None
if not existing:
logger.warning(f"Document {documentId} not found")
return None
createdBy = existing.get("sysCreatedBy")
# Check system RBAC permission (userreport can only edit their own records)
if not self.checkCombinedPermission(TrusteeDocument, "update", recordCreatedBy=createdBy):
logger.warning(f"User {self.userId} lacks permission to update document")
return None
data["id"] = documentId
updatedRecord = self.db.recordModify(TrusteeDocument, documentId, data)
if not updatedRecord:
return None
cleanedRecord = {k: v for k, v in updatedRecord.items() if not k.startswith("_") and k != "documentData"}
return TrusteeDocument(**cleanedRecord)
def deleteDocument(self, documentId: str) -> bool:
"""Delete a document.
Positions referencing this document will have their documentId set to None.
"""
existingRecords = self.db.getRecordset(TrusteeDocument, recordFilter={"id": documentId})
existing = existingRecords[0] if existingRecords else None
if not existing:
logger.warning(f"Document {documentId} not found")
return False
createdBy = existing.get("sysCreatedBy")
if not self.checkCombinedPermission(TrusteeDocument, "delete", recordCreatedBy=createdBy):
logger.warning(f"User {self.userId} lacks permission to delete document")
return False
# Clear documentId or bankDocumentId on positions that reference this document
for field in ("documentId", "bankDocumentId"):
positions = self.db.getRecordset(TrusteePosition, recordFilter={field: documentId})
for pos in positions:
posId = pos.get("id")
if posId:
self.db.recordModify(TrusteePosition, posId, {field: None})
return self.db.recordDelete(TrusteeDocument, documentId)
# ===== Position CRUD =====
def _toTrusteePositionOrDelete(self, rawRecord: Dict[str, Any], deleteCorrupt: bool = True) -> Optional[TrusteePosition]:
"""Build TrusteePosition safely; optionally delete irreparably corrupt records."""
cleanRecord = {k: v for k, v in (rawRecord or {}).items() if not k.startswith("_") or k == "sysCreatedAt"}
if not cleanRecord:
return None
normalisedRecord = _sanitisePositionPayload(cleanRecord)
recordId = normalisedRecord.get("id") or cleanRecord.get("id")
try:
model = TrusteePosition(**normalisedRecord)
if recordId and normalisedRecord != cleanRecord:
try:
self.db.recordModify(TrusteePosition, recordId, normalisedRecord)
logger.info(f"Normalised legacy TrusteePosition record: {recordId}")
except Exception as writeErr:
logger.warning(f"Could not persist normalised TrusteePosition {recordId}: {writeErr}")
return model
except ValidationError as err:
logger.error(f"Corrupt TrusteePosition record detected (id={recordId}): {err}")
if deleteCorrupt and recordId:
try:
deleted = self.db.recordDelete(TrusteePosition, recordId)
if deleted:
logger.warning(f"Deleted corrupt TrusteePosition record: {recordId}")
else:
logger.warning(f"Failed to delete corrupt TrusteePosition record: {recordId}")
except Exception as deleteErr:
logger.error(f"Error deleting corrupt TrusteePosition record {recordId}: {deleteErr}")
return None
def createPosition(self, data: Dict[str, Any]) -> Optional[TrusteePosition]:
"""Create a new position.
Note: organisationId and contractId removed - feature instance IS the organisation.
Permission is checked via system RBAC (feature-level access).
"""
# Check system RBAC permission
if not self.checkCombinedPermission(TrusteePosition, "create"):
logger.warning(f"User {self.userId} lacks permission to create position")
return None
# Failsafe normalisation to keep DB payload stable for AI/manual inputs.
data = _sanitisePositionPayload(data)
# Auto-set context fields
data["mandateId"] = self.mandateId
data["featureInstanceId"] = self.featureInstanceId
# Calculate VAT amount if not provided
if "vatAmount" not in data or data.get("vatAmount") == 0:
bookingAmount = data.get("bookingAmount", 0)
vatPercentage = data.get("vatPercentage", 0)
data["vatAmount"] = bookingAmount * vatPercentage / 100
import uuid
positionId = data.get("id") or str(uuid.uuid4())
data["id"] = positionId
createdRecord = self.db.recordCreate(TrusteePosition, data)
if createdRecord and createdRecord.get("id"):
return self._toTrusteePositionOrDelete(createdRecord, deleteCorrupt=False)
return None
def getPosition(self, positionId: str) -> Optional[TrusteePosition]:
"""Get a single position by ID."""
records = self.db.getRecordset(TrusteePosition, recordFilter={"id": positionId})
if not records:
return None
return self._toTrusteePositionOrDelete(records[0], deleteCorrupt=True)
def getAllPositions(self, params: Optional[PaginationParams] = None) -> Union[List[Dict], PaginatedResult]:
"""Get all positions with RBAC filtering and optional DB-level pagination.
Filtering, sorting, and pagination are handled at the SQL level.
Post-processing cleans internal fields (keeps sysCreatedAt) and validates
each record via _toTrusteePositionOrDelete (corrupt rows are deleted).
NOTE(post-process): totalItems may slightly overcount when corrupt legacy
records are removed from the current page.
"""
result = getRecordsetPaginatedWithRBAC(
connector=self.db,
modelClass=TrusteePosition,
currentUser=self.currentUser,
pagination=params,
recordFilter=None,
mandateId=self.mandateId,
featureInstanceId=self.featureInstanceId,
featureCode=self.FEATURE_CODE
)
keepFields = {'sysCreatedAt'}
def _cleanAndValidate(records):
items = []
for record in records:
labelCols = {k: v for k, v in record.items() if k.endswith("Label")}
cleanedRecord = {k: v for k, v in record.items() if not k.startswith("_") or k in keepFields}
position = self._toTrusteePositionOrDelete(cleanedRecord, deleteCorrupt=True)
if position is not None:
d = position.model_dump()
d.update(labelCols)
items.append(d)
return items
if isinstance(result, PaginatedResult):
result.items = _cleanAndValidate(result.items)
return result
return _cleanAndValidate(result)
def getPositionsByContract(self, contractId: str, pagination: Optional[PaginationParams] = None) -> Union[List[TrusteePosition], PaginatedResult]:
"""Get all positions for a specific contract."""
result = getRecordsetPaginatedWithRBAC(
connector=self.db,
modelClass=TrusteePosition,
currentUser=self.currentUser,
pagination=pagination,
recordFilter={"contractId": contractId},
mandateId=self.mandateId,
featureInstanceId=self.featureInstanceId,
featureCode=self.FEATURE_CODE
)
def _validatePositions(records):
items = []
for record in records:
position = self._toTrusteePositionOrDelete(record, deleteCorrupt=True)
if position is not None:
items.append(position)
return items
if isinstance(result, PaginatedResult):
result.items = _validatePositions(result.items)
return result
return _validatePositions(result)
def getPositionsByOrganisation(self, organisationId: str, pagination: Optional[PaginationParams] = None) -> Union[List[TrusteePosition], PaginatedResult]:
"""Get all positions for a specific organisation."""
result = getRecordsetPaginatedWithRBAC(
connector=self.db,
modelClass=TrusteePosition,
currentUser=self.currentUser,
pagination=pagination,
recordFilter={"organisationId": organisationId},
mandateId=self.mandateId,
featureInstanceId=self.featureInstanceId,
featureCode=self.FEATURE_CODE
)
def _validatePositions(records):
items = []
for record in records:
position = self._toTrusteePositionOrDelete(record, deleteCorrupt=True)
if position is not None:
items.append(position)
return items
if isinstance(result, PaginatedResult):
result.items = _validatePositions(result.items)
return result
return _validatePositions(result)
def updatePosition(self, positionId: str, data: Dict[str, Any]) -> Optional[TrusteePosition]:
"""Update a position.
Note: organisationId and contractId removed - feature instance IS the organisation.
"""
# Get existing position to check creator
existingRecords = self.db.getRecordset(TrusteePosition, recordFilter={"id": positionId})
existing = existingRecords[0] if existingRecords else None
if not existing:
logger.warning(f"Position {positionId} not found")
return None
createdBy = existing.get("sysCreatedBy")
# Check system RBAC permission (userreport can only edit their own records)
if not self.checkCombinedPermission(TrusteePosition, "update", recordCreatedBy=createdBy):
logger.warning(f"User {self.userId} lacks permission to update position")
return None
data["id"] = positionId
updatedRecord = self.db.recordModify(TrusteePosition, positionId, data)
if not updatedRecord:
return None
return self._toTrusteePositionOrDelete(updatedRecord, deleteCorrupt=False)
def deletePosition(self, positionId: str) -> bool:
"""Delete a position."""
existingRecords = self.db.getRecordset(TrusteePosition, recordFilter={"id": positionId})
existing = existingRecords[0] if existingRecords else None
if not existing:
logger.warning(f"Position {positionId} not found")
return False
createdBy = existing.get("sysCreatedBy")
if not self.checkCombinedPermission(TrusteePosition, "delete", recordCreatedBy=createdBy):
logger.warning(f"User {self.userId} lacks permission to delete position")
return False
return self.db.recordDelete(TrusteePosition, positionId)
# ===== Position-Document Queries =====
def getPositionsByDocument(self, documentId: str, pagination: Optional[PaginationParams] = None) -> Union[List[TrusteePosition], PaginatedResult]:
"""Get all positions that reference a specific document (1:N via documentId FK)."""
result = getRecordsetPaginatedWithRBAC(
connector=self.db,
modelClass=TrusteePosition,
currentUser=self.currentUser,
pagination=pagination,
recordFilter={"documentId": documentId},
mandateId=self.mandateId,
featureInstanceId=self.featureInstanceId,
featureCode=self.FEATURE_CODE
)
def _validatePositions(records):
items = []
for record in records:
position = self._toTrusteePositionOrDelete(record, deleteCorrupt=True)
if position is not None:
items.append(position)
return items
if isinstance(result, PaginatedResult):
result.items = _validatePositions(result.items)
return result
return _validatePositions(result)
# ===== Trustee-specific Access Check =====
def getUserAccessForOrganisation(self, userId: str, organisationId: str) -> List[Dict[str, Any]]:
"""Get all access records for a user in a specific organisation."""
return self.db.getRecordset(
TrusteeAccess,
{"userId": userId, "organisationId": organisationId}
)
def getAllUserAccess(self, userId: str) -> List[Dict[str, Any]]:
"""Get all access records for a user across all organisations."""
return self.db.getRecordset(TrusteeAccess, recordFilter={"userId": userId})
def getUserTrusteeRoles(self, userId: str, organisationId: str, contractId: Optional[str] = None) -> List[str]:
"""
Get all trustee roles a user has for an organisation (and optionally contract).
Args:
userId: User ID to check
organisationId: Organisation ID
contractId: Optional contract ID for contract-specific access
Returns:
List of role IDs the user has
"""
accessRecords = self.getUserAccessForOrganisation(userId, organisationId)
roles = []
for access in accessRecords:
accessContractId = access.get("contractId")
roleId = access.get("roleId")
# If access has no contractId, it grants access to all contracts
if accessContractId is None:
if roleId not in roles:
roles.append(roleId)
# If checking for specific contract, match it
elif contractId and accessContractId == contractId:
if roleId not in roles:
roles.append(roleId)
# If no specific contract requested but access is contract-specific,
# only include if we're not filtering by contract
elif contractId is None:
# User has contract-specific access, but we're not filtering
# They can see the record but only for their specific contracts
if roleId not in roles:
roles.append(roleId)
return roles
def checkUserTrusteePermission(
self,
userId: str,
organisationId: str,
requiredRole: str,
contractId: Optional[str] = None
) -> bool:
"""
Check if a user has a specific role for an organisation (and optionally contract).
Args:
userId: User ID to check
organisationId: Organisation ID
requiredRole: Required role (userreport, admin, operate)
contractId: Optional contract ID for contract-specific access
Returns:
True if user has the required role
"""
roles = self.getUserTrusteeRoles(userId, organisationId, contractId)
return requiredRole in roles
def checkCombinedPermission(
self,
modelClass: type,
operation: str,
organisationId: Optional[str] = None,
contractId: Optional[str] = None,
recordCreatedBy: Optional[str] = None
) -> bool:
"""
Check combined system RBAC + feature-level RBAC permissions.
Args:
modelClass: The model class (e.g., TrusteeContract)
operation: Operation type (read, create, update, delete)
organisationId: Optional organisation ID for feature-level check
contractId: Optional contract ID for contract-level filtering
recordCreatedBy: Optional creator ID for userreport role checks
Returns:
True if user has permission
"""
# Step 1: Check system RBAC and get access level
accessLevel = self.getRbacAccessLevel(modelClass, operation)
if accessLevel == AccessLevel.NONE:
return False
# Users with ALL access level bypass feature-level checks
if accessLevel == AccessLevel.ALL:
return True
# Step 2: If no organisationId, system RBAC is sufficient (for listing all)
if organisationId is None:
return True
# Step 3: Check feature-level RBAC via trustee.access
roles = self.getUserTrusteeRoles(self.userId, organisationId, contractId)
if not roles:
# No trustee access for this organisation
return False
# Check role-based permissions
# admin role: full CRUD for organisation
if "admin" in roles:
return True
# operate role: CRUD for contracts, documents, positions
if "operate" in roles:
if modelClass in (TrusteeContract, TrusteeDocument, TrusteePosition):
return True
# operate can read organisations
if modelClass == TrusteeOrganisation and operation == "read":
return True
# userreport role: CRUD own records for documents/positions
if "userreport" in roles:
if modelClass in (TrusteeDocument, TrusteePosition):
# For create, always allowed
if operation == "create":
return True
# For read/update/delete, must be own record
if recordCreatedBy == self.userId:
return True
# userreport can read organisations and contracts
if modelClass in (TrusteeOrganisation, TrusteeContract) and operation == "read":
return True
return False