1600 lines
67 KiB
Python
1600 lines
67 KiB
Python
# Copyright (c) 2025 Patrick Motsch
|
|
# All rights reserved.
|
|
"""
|
|
Interface to Trustee database.
|
|
Manages trustee organisations, roles, access, contracts, documents, and positions.
|
|
"""
|
|
|
|
import logging
|
|
import math
|
|
import uuid
|
|
from typing import Dict, Any, List, Optional, Union
|
|
|
|
from modules.connectors.connectorDbPostgre import DatabaseConnector
|
|
from modules.shared.configuration import APP_CONFIG
|
|
from modules.interfaces.interfaceRbac import getRecordsetWithRBAC
|
|
from modules.security.rbac import RbacClass
|
|
from modules.datamodels.datamodelUam import User, AccessLevel
|
|
from modules.datamodels.datamodelRbac import AccessRuleContext
|
|
from .datamodelFeatureTrustee import (
|
|
TrusteeOrganisation,
|
|
TrusteeRole,
|
|
TrusteeAccess,
|
|
TrusteeContract,
|
|
TrusteeDocument,
|
|
TrusteePosition,
|
|
TrusteePositionDocument,
|
|
)
|
|
from modules.datamodels.datamodelPagination import PaginationParams, PaginatedResult
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Singleton factory for TrusteeObjects instances per context
|
|
_trusteeInterfaces = {}
|
|
|
|
|
|
def getInterface(currentUser: User, mandateId: Optional[Union[str, uuid.UUID]] = None, featureInstanceId: Optional[str] = None) -> "TrusteeObjects":
|
|
"""Get or create a TrusteeObjects instance for the given user context.
|
|
|
|
Args:
|
|
currentUser: The authenticated user
|
|
mandateId: The mandate ID from RequestContext (X-Mandate-Id header). Required.
|
|
featureInstanceId: The feature instance ID from RequestContext (X-Feature-Instance-Id header).
|
|
"""
|
|
global _trusteeInterfaces
|
|
|
|
if not currentUser or not currentUser.id:
|
|
raise ValueError("Valid user context required")
|
|
|
|
effectiveMandateId = str(mandateId) if mandateId else None
|
|
effectiveFeatureInstanceId = str(featureInstanceId) if featureInstanceId else None
|
|
|
|
# Include featureInstanceId in cache key for proper isolation
|
|
cacheKey = f"{currentUser.id}_{effectiveMandateId}_{effectiveFeatureInstanceId}"
|
|
|
|
if cacheKey not in _trusteeInterfaces:
|
|
_trusteeInterfaces[cacheKey] = TrusteeObjects(currentUser, mandateId=effectiveMandateId, featureInstanceId=effectiveFeatureInstanceId)
|
|
else:
|
|
# Update user context if needed
|
|
_trusteeInterfaces[cacheKey].setUserContext(currentUser, mandateId=effectiveMandateId, featureInstanceId=effectiveFeatureInstanceId)
|
|
|
|
return _trusteeInterfaces[cacheKey]
|
|
|
|
|
|
class TrusteeObjects:
|
|
"""
|
|
Interface to Trustee database.
|
|
Manages trustee organisations, roles, access, contracts, documents, and positions.
|
|
"""
|
|
|
|
# Feature code for RBAC objectKey construction
|
|
# Used to build: data.feature.trustee.{TableName}
|
|
FEATURE_CODE = "trustee"
|
|
|
|
def __init__(self, currentUser: Optional[User] = None, mandateId: Optional[str] = None, featureInstanceId: Optional[str] = None):
|
|
"""Initializes the Trustee Interface.
|
|
|
|
Args:
|
|
currentUser: The authenticated user
|
|
mandateId: The mandate ID from RequestContext (X-Mandate-Id header)
|
|
featureInstanceId: The feature instance ID from RequestContext (X-Feature-Instance-Id header)
|
|
"""
|
|
self.currentUser = currentUser
|
|
self.userId = currentUser.id if currentUser else None
|
|
# Use mandateId from parameter (Request-Context), not from user object
|
|
self.mandateId = mandateId
|
|
self.featureInstanceId = featureInstanceId
|
|
self.rbac = None
|
|
|
|
# Initialize database
|
|
self._initializeDatabase()
|
|
|
|
# Set user context if provided
|
|
if currentUser:
|
|
self.setUserContext(currentUser, mandateId=mandateId, featureInstanceId=featureInstanceId)
|
|
|
|
def setUserContext(self, currentUser: User, mandateId: Optional[str] = None, featureInstanceId: Optional[str] = None):
|
|
"""Sets the user context for the interface.
|
|
|
|
Args:
|
|
currentUser: The authenticated user
|
|
mandateId: The mandate ID from RequestContext (X-Mandate-Id header)
|
|
featureInstanceId: The feature instance ID from RequestContext (X-Feature-Instance-Id header)
|
|
"""
|
|
if not currentUser:
|
|
logger.info("Initializing interface without user context")
|
|
return
|
|
|
|
self.currentUser = currentUser
|
|
self.userId = currentUser.id
|
|
# Use mandateId from parameter (Request-Context), not from user object
|
|
self.mandateId = mandateId
|
|
self.featureInstanceId = featureInstanceId
|
|
|
|
if not self.userId:
|
|
raise ValueError("Invalid user context: id is required")
|
|
|
|
# Note: mandateId is ALWAYS optional here - it comes from Request-Context, not from User.
|
|
# Users are NOT assigned to mandates by design - they get mandate context from the request.
|
|
# sysAdmin users can additionally perform cross-mandate operations.
|
|
# Without mandateId, operations will be filtered to accessible mandates via RBAC.
|
|
|
|
self.userLanguage = currentUser.language
|
|
|
|
# Initialize RBAC interface
|
|
from modules.security.rootAccess import getRootDbAppConnector
|
|
dbApp = getRootDbAppConnector()
|
|
self.rbac = RbacClass(self.db, dbApp=dbApp)
|
|
|
|
# Update database context
|
|
self.db.updateContext(self.userId)
|
|
|
|
def __del__(self):
|
|
"""Cleanup method to close database connection."""
|
|
if hasattr(self, "db") and self.db is not None:
|
|
try:
|
|
self.db.close()
|
|
except Exception as e:
|
|
logger.error(f"Error closing database connection: {e}")
|
|
|
|
def _initializeDatabase(self):
|
|
"""Initializes the database connection directly."""
|
|
try:
|
|
dbHost = APP_CONFIG.get("DB_HOST", "_no_config_default_data")
|
|
dbDatabase = "poweron_trustee"
|
|
dbUser = APP_CONFIG.get("DB_USER")
|
|
dbPassword = APP_CONFIG.get("DB_PASSWORD_SECRET")
|
|
dbPort = int(APP_CONFIG.get("DB_PORT", 5432))
|
|
|
|
self.db = DatabaseConnector(
|
|
dbHost=dbHost,
|
|
dbDatabase=dbDatabase,
|
|
dbUser=dbUser,
|
|
dbPassword=dbPassword,
|
|
dbPort=dbPort,
|
|
userId=self.userId,
|
|
)
|
|
|
|
self.db.initDbSystem()
|
|
logger.info(f"Trustee database initialized successfully for user {self.userId}")
|
|
except Exception as e:
|
|
logger.error(f"Failed to initialize Trustee database: {str(e)}")
|
|
raise
|
|
|
|
def checkRbacPermission(
|
|
self,
|
|
modelClass: type,
|
|
operation: str,
|
|
recordId: Optional[str] = None
|
|
) -> bool:
|
|
"""Check RBAC permission for a specific operation on a table."""
|
|
if not self.rbac or not self.currentUser:
|
|
return False
|
|
|
|
tableName = modelClass.__name__
|
|
permissions = self.rbac.getUserPermissions(
|
|
self.currentUser,
|
|
AccessRuleContext.DATA,
|
|
tableName,
|
|
mandateId=self.mandateId,
|
|
featureInstanceId=self.featureInstanceId
|
|
)
|
|
|
|
if not permissions.view:
|
|
return False
|
|
|
|
permLevel = getattr(permissions, operation, AccessLevel.NONE)
|
|
if permLevel == AccessLevel.NONE:
|
|
return False
|
|
|
|
return True
|
|
|
|
def getRbacAccessLevel(self, modelClass: type, operation: str) -> AccessLevel:
|
|
"""Get the RBAC access level for a specific operation on a table.
|
|
|
|
Returns:
|
|
AccessLevel (ALL, GROUP, MY, NONE) - determines what filtering is needed
|
|
"""
|
|
if not self.rbac or not self.currentUser:
|
|
return AccessLevel.NONE
|
|
|
|
tableName = modelClass.__name__
|
|
permissions = self.rbac.getUserPermissions(
|
|
self.currentUser,
|
|
AccessRuleContext.DATA,
|
|
tableName,
|
|
mandateId=self.mandateId,
|
|
featureInstanceId=self.featureInstanceId
|
|
)
|
|
|
|
if not permissions.view:
|
|
return AccessLevel.NONE
|
|
|
|
return getattr(permissions, operation, AccessLevel.NONE)
|
|
|
|
# ===== Pagination Helper Functions =====
|
|
|
|
def _applyFilters(self, records: List[Dict[str, Any]], params: Optional[PaginationParams]) -> List[Dict[str, Any]]:
|
|
"""
|
|
Apply filter criteria to records.
|
|
|
|
Supports:
|
|
- General search: params.filters["search"] - searches across all text fields
|
|
- Field-specific filters: params.filters
|
|
- Simple: {"status": "running"} - equals match
|
|
- With operator: {"status": {"operator": "equals", "value": "running"}}
|
|
- Operators: equals, contains, gt, gte, lt, lte, in, notIn, startsWith, endsWith
|
|
|
|
Args:
|
|
records: List of record dictionaries to filter
|
|
params: PaginationParams with filters (search is inside filters)
|
|
|
|
Returns:
|
|
Filtered list of records
|
|
"""
|
|
if not params or not records:
|
|
return records
|
|
|
|
# Get filters safely (may be None)
|
|
filters = getattr(params, 'filters', None)
|
|
if not filters:
|
|
return records
|
|
|
|
filtered = records
|
|
|
|
# Handle general search across text fields (search is inside filters)
|
|
searchTerm = filters.get("search") if isinstance(filters, dict) else None
|
|
if searchTerm:
|
|
searchTerm = str(searchTerm).lower()
|
|
if searchTerm:
|
|
searchFiltered = []
|
|
for record in filtered:
|
|
found = False
|
|
for key, value in record.items():
|
|
if isinstance(value, str) and searchTerm in value.lower():
|
|
found = True
|
|
break
|
|
elif isinstance(value, (int, float)) and searchTerm in str(value):
|
|
found = True
|
|
break
|
|
if found:
|
|
searchFiltered.append(record)
|
|
filtered = searchFiltered
|
|
|
|
# Handle field-specific filters
|
|
if filters:
|
|
for fieldName, filterValue in filters.items():
|
|
if fieldName == "search":
|
|
continue # Already handled above
|
|
|
|
fieldFiltered = []
|
|
for record in filtered:
|
|
if fieldName not in record:
|
|
continue
|
|
|
|
recordValue = record.get(fieldName)
|
|
|
|
# Handle simple value (equals operator)
|
|
if not isinstance(filterValue, dict):
|
|
if recordValue == filterValue:
|
|
fieldFiltered.append(record)
|
|
continue
|
|
|
|
# Handle filter with operator
|
|
operator = filterValue.get("operator", "equals")
|
|
filterVal = filterValue.get("value")
|
|
|
|
matches = False
|
|
if operator in ["equals", "eq"]:
|
|
matches = recordValue == filterVal
|
|
|
|
elif operator == "contains":
|
|
recordStr = str(recordValue).lower() if recordValue is not None else ""
|
|
filterStr = str(filterVal).lower() if filterVal is not None else ""
|
|
matches = filterStr in recordStr
|
|
|
|
elif operator == "startsWith":
|
|
recordStr = str(recordValue).lower() if recordValue is not None else ""
|
|
filterStr = str(filterVal).lower() if filterVal is not None else ""
|
|
matches = recordStr.startswith(filterStr)
|
|
|
|
elif operator == "endsWith":
|
|
recordStr = str(recordValue).lower() if recordValue is not None else ""
|
|
filterStr = str(filterVal).lower() if filterVal is not None else ""
|
|
matches = recordStr.endswith(filterStr)
|
|
|
|
elif operator == "gt":
|
|
try:
|
|
recordNum = float(recordValue) if recordValue is not None else float('-inf')
|
|
filterNum = float(filterVal) if filterVal is not None else float('-inf')
|
|
matches = recordNum > filterNum
|
|
except (ValueError, TypeError):
|
|
matches = False
|
|
|
|
elif operator == "gte":
|
|
try:
|
|
recordNum = float(recordValue) if recordValue is not None else float('-inf')
|
|
filterNum = float(filterVal) if filterVal is not None else float('-inf')
|
|
matches = recordNum >= filterNum
|
|
except (ValueError, TypeError):
|
|
matches = False
|
|
|
|
elif operator == "lt":
|
|
try:
|
|
recordNum = float(recordValue) if recordValue is not None else float('inf')
|
|
filterNum = float(filterVal) if filterVal is not None else float('inf')
|
|
matches = recordNum < filterNum
|
|
except (ValueError, TypeError):
|
|
matches = False
|
|
|
|
elif operator == "lte":
|
|
try:
|
|
recordNum = float(recordValue) if recordValue is not None else float('inf')
|
|
filterNum = float(filterVal) if filterVal is not None else float('inf')
|
|
matches = recordNum <= filterNum
|
|
except (ValueError, TypeError):
|
|
matches = False
|
|
|
|
elif operator == "in":
|
|
if isinstance(filterVal, list):
|
|
matches = recordValue in filterVal
|
|
else:
|
|
matches = False
|
|
|
|
elif operator == "notIn":
|
|
if isinstance(filterVal, list):
|
|
matches = recordValue not in filterVal
|
|
else:
|
|
matches = False
|
|
|
|
if matches:
|
|
fieldFiltered.append(record)
|
|
|
|
filtered = fieldFiltered
|
|
|
|
return filtered
|
|
|
|
def _applySorting(self, records: List[Dict[str, Any]], params: Optional[PaginationParams]) -> List[Dict[str, Any]]:
|
|
"""Apply multi-level sorting to records using stable sort."""
|
|
if not params:
|
|
return records
|
|
|
|
# Get sort safely (may be None or empty list)
|
|
sortFields = getattr(params, 'sort', None)
|
|
if not sortFields:
|
|
return records
|
|
|
|
sortedRecords = list(records)
|
|
|
|
# Sort from least significant to most significant field (reverse order)
|
|
# Python's sort is stable, so this creates proper multi-level sorting
|
|
for sortField in reversed(sortFields):
|
|
# Handle both dict and object formats
|
|
if isinstance(sortField, dict):
|
|
fieldName = sortField.get("field")
|
|
direction = sortField.get("direction", "asc")
|
|
else:
|
|
fieldName = getattr(sortField, "field", None)
|
|
direction = getattr(sortField, "direction", "asc")
|
|
|
|
if not fieldName:
|
|
continue
|
|
|
|
isDesc = (direction == "desc")
|
|
|
|
def makeSortKey(fName):
|
|
def sortKey(record):
|
|
value = record.get(fName)
|
|
# Handle None values - place them at the end for both directions
|
|
if value is None:
|
|
return (1, "") # sorts after (0, ...)
|
|
else:
|
|
if isinstance(value, (int, float)):
|
|
return (0, value)
|
|
elif isinstance(value, str):
|
|
return (0, value.lower())
|
|
elif isinstance(value, bool):
|
|
return (0, value)
|
|
else:
|
|
return (0, str(value))
|
|
return sortKey
|
|
|
|
sortedRecords.sort(key=makeSortKey(fieldName), reverse=isDesc)
|
|
|
|
return sortedRecords
|
|
|
|
# ===== Organisation CRUD =====
|
|
|
|
def createOrganisation(self, data: Dict[str, Any]) -> Optional[TrusteeOrganisation]:
|
|
"""Create a new organisation."""
|
|
if not self.checkRbacPermission(TrusteeOrganisation, "create"):
|
|
logger.warning(f"User {self.userId} lacks permission to create organisation")
|
|
return None
|
|
|
|
# Set mandateId and featureInstanceId from context for proper data isolation
|
|
data["mandateId"] = self.mandateId
|
|
if "featureInstanceId" not in data:
|
|
data["featureInstanceId"] = self.featureInstanceId
|
|
|
|
# Validate ID format (alphanumeric, hyphens, underscores, 3-50 chars)
|
|
orgId = data.get("id", "")
|
|
if not orgId or len(orgId) < 3 or len(orgId) > 50:
|
|
logger.error(f"Invalid organisation ID length: {len(orgId)}")
|
|
return None
|
|
|
|
import re
|
|
if not re.match(r'^[a-zA-Z0-9_-]+$', orgId):
|
|
logger.error(f"Invalid organisation ID format: {orgId}")
|
|
return None
|
|
|
|
createdRecord = self.db.recordCreate(TrusteeOrganisation, data)
|
|
if createdRecord and createdRecord.get("id"):
|
|
return TrusteeOrganisation(**{k: v for k, v in createdRecord.items() if not k.startswith("_")})
|
|
return None
|
|
|
|
def getOrganisation(self, orgId: str) -> Optional[TrusteeOrganisation]:
|
|
"""Get a single organisation by ID."""
|
|
records = self.db.getRecordset(TrusteeOrganisation, recordFilter={"id": orgId})
|
|
if not records:
|
|
return None
|
|
return TrusteeOrganisation(**{k: v for k, v in records[0].items() if not k.startswith("_")})
|
|
|
|
def getAllOrganisations(self, params: Optional[PaginationParams] = None) -> PaginatedResult:
|
|
"""Get all organisations with RBAC filtering.
|
|
|
|
Note: Organisations are managed at system level (by mandate).
|
|
Feature-level filtering (trustee.access) is NOT applied here because:
|
|
- Admin users with system RBAC can manage all orgs in their mandate
|
|
- trustee.access grants access to specific orgs for other users
|
|
- New organisations wouldn't be visible without an access record
|
|
"""
|
|
# Debug: Log user info and permissions
|
|
logger.debug(f"getAllOrganisations called for user {self.userId}, mandateId: {self.mandateId}")
|
|
|
|
# System RBAC filtering (filters by mandate for GROUP access level)
|
|
records = getRecordsetWithRBAC(
|
|
connector=self.db,
|
|
modelClass=TrusteeOrganisation,
|
|
currentUser=self.currentUser,
|
|
recordFilter=None,
|
|
orderBy="id",
|
|
mandateId=self.mandateId,
|
|
featureInstanceId=self.featureInstanceId,
|
|
featureCode=self.FEATURE_CODE
|
|
)
|
|
logger.debug(f"getAllOrganisations: getRecordsetWithRBAC returned {len(records)} records")
|
|
|
|
# Apply pagination
|
|
totalItems = len(records)
|
|
if params:
|
|
pageSize = params.pageSize or 20
|
|
page = params.page or 1
|
|
startIdx = (page - 1) * pageSize
|
|
endIdx = startIdx + pageSize
|
|
items = records[startIdx:endIdx]
|
|
totalPages = math.ceil(totalItems / pageSize) if pageSize > 0 else 1
|
|
else:
|
|
items = records
|
|
totalPages = 1
|
|
page = 1
|
|
pageSize = totalItems
|
|
|
|
return PaginatedResult(
|
|
items=items,
|
|
totalItems=totalItems,
|
|
totalPages=totalPages
|
|
)
|
|
|
|
def updateOrganisation(self, orgId: str, data: Dict[str, Any]) -> Optional[TrusteeOrganisation]:
|
|
"""Update an organisation."""
|
|
if not self.checkRbacPermission(TrusteeOrganisation, "update"):
|
|
logger.warning(f"User {self.userId} lacks permission to update organisation")
|
|
return None
|
|
|
|
# ID cannot be changed after creation
|
|
if "id" in data and data["id"] != orgId:
|
|
logger.error("Organisation ID cannot be changed")
|
|
return None
|
|
|
|
data["id"] = orgId
|
|
updatedRecord = self.db.recordModify(TrusteeOrganisation, orgId, data)
|
|
if not updatedRecord:
|
|
return None
|
|
return TrusteeOrganisation(**{k: v for k, v in updatedRecord.items() if not k.startswith("_")})
|
|
|
|
def deleteOrganisation(self, orgId: str) -> bool:
|
|
"""Delete an organisation."""
|
|
if not self.checkRbacPermission(TrusteeOrganisation, "delete"):
|
|
logger.warning(f"User {self.userId} lacks permission to delete organisation")
|
|
return False
|
|
|
|
return self.db.recordDelete(TrusteeOrganisation, orgId)
|
|
|
|
# ===== Role CRUD =====
|
|
|
|
def createRole(self, data: Dict[str, Any]) -> Optional[TrusteeRole]:
|
|
"""Create a new role (sysadmin only)."""
|
|
if not self.checkRbacPermission(TrusteeRole, "create"):
|
|
logger.warning(f"User {self.userId} lacks permission to create role")
|
|
return None
|
|
|
|
data["mandateId"] = self.mandateId
|
|
if "featureInstanceId" not in data:
|
|
data["featureInstanceId"] = self.featureInstanceId
|
|
roleId = data.get("id", "")
|
|
|
|
if not roleId:
|
|
logger.error("Role ID is required")
|
|
return None
|
|
|
|
createdRecord = self.db.recordCreate(TrusteeRole, data)
|
|
if createdRecord and createdRecord.get("id"):
|
|
return TrusteeRole(**{k: v for k, v in createdRecord.items() if not k.startswith("_")})
|
|
return None
|
|
|
|
def getRole(self, roleId: str) -> Optional[TrusteeRole]:
|
|
"""Get a single role by ID."""
|
|
records = self.db.getRecordset(TrusteeRole, recordFilter={"id": roleId})
|
|
if not records:
|
|
return None
|
|
return TrusteeRole(**{k: v for k, v in records[0].items() if not k.startswith("_")})
|
|
|
|
def getAllRoles(self, params: Optional[PaginationParams] = None) -> PaginatedResult:
|
|
"""Get all roles with RBAC filtering.
|
|
|
|
Note: Roles are available to all users with trustee access.
|
|
They are not filtered by organisation since they define the role types.
|
|
Users with ALL access level see all roles; others need trustee.access records.
|
|
"""
|
|
records = getRecordsetWithRBAC(
|
|
connector=self.db,
|
|
modelClass=TrusteeRole,
|
|
currentUser=self.currentUser,
|
|
recordFilter=None,
|
|
orderBy="id",
|
|
mandateId=self.mandateId,
|
|
featureInstanceId=self.featureInstanceId,
|
|
featureCode=self.FEATURE_CODE
|
|
)
|
|
|
|
# Users with ALL access level (from system RBAC) see all roles
|
|
# Others need at least one trustee.access record
|
|
accessLevel = self.getRbacAccessLevel(TrusteeRole, "read")
|
|
if accessLevel != AccessLevel.ALL:
|
|
userAccess = self.getAllUserAccess(self.userId)
|
|
if not userAccess:
|
|
records = [] # No trustee access at all
|
|
|
|
totalItems = len(records)
|
|
if params:
|
|
pageSize = params.pageSize or 20
|
|
page = params.page or 1
|
|
startIdx = (page - 1) * pageSize
|
|
endIdx = startIdx + pageSize
|
|
items = records[startIdx:endIdx]
|
|
totalPages = math.ceil(totalItems / pageSize) if pageSize > 0 else 1
|
|
else:
|
|
items = records
|
|
totalPages = 1
|
|
page = 1
|
|
pageSize = totalItems
|
|
|
|
return PaginatedResult(
|
|
items=items,
|
|
totalItems=totalItems,
|
|
totalPages=totalPages
|
|
)
|
|
|
|
def updateRole(self, roleId: str, data: Dict[str, Any]) -> Optional[TrusteeRole]:
|
|
"""Update a role (sysadmin only)."""
|
|
if not self.checkRbacPermission(TrusteeRole, "update"):
|
|
logger.warning(f"User {self.userId} lacks permission to update role")
|
|
return None
|
|
|
|
data["id"] = roleId
|
|
updatedRecord = self.db.recordModify(TrusteeRole, roleId, data)
|
|
if not updatedRecord:
|
|
return None
|
|
return TrusteeRole(**{k: v for k, v in updatedRecord.items() if not k.startswith("_")})
|
|
|
|
def deleteRole(self, roleId: str) -> bool:
|
|
"""Delete a role (sysadmin only, not if in use)."""
|
|
if not self.checkRbacPermission(TrusteeRole, "delete"):
|
|
logger.warning(f"User {self.userId} lacks permission to delete role")
|
|
return False
|
|
|
|
# Check if role is in use
|
|
accessRecords = self.db.getRecordset(TrusteeAccess, recordFilter={"roleId": roleId})
|
|
if accessRecords:
|
|
logger.error(f"Cannot delete role {roleId}: still in use")
|
|
return False
|
|
|
|
return self.db.recordDelete(TrusteeRole, roleId)
|
|
|
|
# ===== Access CRUD =====
|
|
|
|
def createAccess(self, data: Dict[str, Any]) -> Optional[TrusteeAccess]:
|
|
"""Create a new access record. Requires admin role for the organisation or ALL access level."""
|
|
# Check system RBAC first
|
|
if not self.checkRbacPermission(TrusteeAccess, "create"):
|
|
logger.warning(f"User {self.userId} lacks system permission to create access")
|
|
return None
|
|
|
|
organisationId = data.get("organisationId")
|
|
|
|
# Users with ALL access level bypass feature-level permission check
|
|
accessLevel = self.getRbacAccessLevel(TrusteeAccess, "create")
|
|
|
|
# Check feature-level permission - must have admin role for this organisation (unless ALL access)
|
|
if accessLevel != AccessLevel.ALL and not self.checkUserTrusteePermission(self.userId, organisationId, "admin"):
|
|
logger.warning(f"User {self.userId} lacks admin role for organisation {organisationId}")
|
|
return None
|
|
|
|
data["mandateId"] = self.mandateId
|
|
if "featureInstanceId" not in data:
|
|
data["featureInstanceId"] = self.featureInstanceId
|
|
|
|
import uuid
|
|
accessId = data.get("id") or str(uuid.uuid4())
|
|
data["id"] = accessId
|
|
|
|
createdRecord = self.db.recordCreate(TrusteeAccess, data)
|
|
if createdRecord and createdRecord.get("id"):
|
|
return TrusteeAccess(**{k: v for k, v in createdRecord.items() if not k.startswith("_")})
|
|
return None
|
|
|
|
def getAccess(self, accessId: str) -> Optional[TrusteeAccess]:
|
|
"""Get a single access record by ID."""
|
|
records = self.db.getRecordset(TrusteeAccess, recordFilter={"id": accessId})
|
|
if not records:
|
|
return None
|
|
return TrusteeAccess(**{k: v for k, v in records[0].items() if not k.startswith("_")})
|
|
|
|
def getAllAccess(self, params: Optional[PaginationParams] = None) -> PaginatedResult:
|
|
"""Get all access records with RBAC filtering + feature-level filtering.
|
|
|
|
Users with ALL access level see all access records.
|
|
Others can only see access records for organisations they have admin access to.
|
|
"""
|
|
# Step 1: System RBAC filtering
|
|
records = getRecordsetWithRBAC(
|
|
connector=self.db,
|
|
modelClass=TrusteeAccess,
|
|
currentUser=self.currentUser,
|
|
recordFilter=None,
|
|
orderBy="id",
|
|
mandateId=self.mandateId,
|
|
featureInstanceId=self.featureInstanceId,
|
|
featureCode=self.FEATURE_CODE
|
|
)
|
|
|
|
# Users with ALL access level (from system RBAC) see all records
|
|
accessLevel = self.getRbacAccessLevel(TrusteeAccess, "read")
|
|
|
|
if accessLevel != AccessLevel.ALL:
|
|
# Step 2: Feature-level filtering - only see access for organisations where user is admin
|
|
userAccess = self.getAllUserAccess(self.userId)
|
|
|
|
# Get organisations where user has admin role
|
|
adminOrgs = set()
|
|
for access in userAccess:
|
|
if access.get("roleId") == "admin":
|
|
adminOrgs.add(access.get("organisationId"))
|
|
|
|
# Filter records to only show those in admin organisations
|
|
if adminOrgs:
|
|
records = [r for r in records if r.get("organisationId") in adminOrgs]
|
|
else:
|
|
records = []
|
|
|
|
totalItems = len(records)
|
|
if params:
|
|
pageSize = params.pageSize or 20
|
|
page = params.page or 1
|
|
startIdx = (page - 1) * pageSize
|
|
endIdx = startIdx + pageSize
|
|
items = records[startIdx:endIdx]
|
|
totalPages = math.ceil(totalItems / pageSize) if pageSize > 0 else 1
|
|
else:
|
|
items = records
|
|
totalPages = 1
|
|
page = 1
|
|
pageSize = totalItems
|
|
|
|
return PaginatedResult(
|
|
items=items,
|
|
totalItems=totalItems,
|
|
totalPages=totalPages
|
|
)
|
|
|
|
def getAccessByOrganisation(self, organisationId: str) -> List[TrusteeAccess]:
|
|
"""Get all access records for a specific organisation.
|
|
|
|
Requires admin role for the organisation.
|
|
"""
|
|
# Check if user has admin access for this organisation
|
|
if not self.checkUserTrusteePermission(self.userId, organisationId, "admin"):
|
|
logger.warning(f"User {self.userId} lacks admin role for organisation {organisationId}")
|
|
return []
|
|
|
|
records = getRecordsetWithRBAC(
|
|
connector=self.db,
|
|
modelClass=TrusteeAccess,
|
|
currentUser=self.currentUser,
|
|
recordFilter={"organisationId": organisationId},
|
|
orderBy="id",
|
|
mandateId=self.mandateId,
|
|
featureInstanceId=self.featureInstanceId,
|
|
featureCode=self.FEATURE_CODE
|
|
)
|
|
return [TrusteeAccess(**{k: v for k, v in r.items() if not k.startswith("_")}) for r in records]
|
|
|
|
def getAccessByUser(self, userId: str) -> List[TrusteeAccess]:
|
|
"""Get all access records for a specific user.
|
|
|
|
Users with ALL access level see all access records.
|
|
Others can only see access records for organisations where they have admin role.
|
|
"""
|
|
records = getRecordsetWithRBAC(
|
|
connector=self.db,
|
|
modelClass=TrusteeAccess,
|
|
currentUser=self.currentUser,
|
|
recordFilter={"userId": userId},
|
|
orderBy="id",
|
|
mandateId=self.mandateId,
|
|
featureInstanceId=self.featureInstanceId,
|
|
featureCode=self.FEATURE_CODE
|
|
)
|
|
|
|
# Users with ALL access level (from system RBAC) see all records
|
|
accessLevel = self.getRbacAccessLevel(TrusteeAccess, "read")
|
|
if accessLevel == AccessLevel.ALL:
|
|
return [TrusteeAccess(**{k: v for k, v in r.items() if not k.startswith("_")}) for r in records]
|
|
|
|
# Filter to only organisations where current user has admin role
|
|
userAccess = self.getAllUserAccess(self.userId)
|
|
adminOrgs = set()
|
|
for access in userAccess:
|
|
if access.get("roleId") == "admin":
|
|
adminOrgs.add(access.get("organisationId"))
|
|
|
|
filtered = [r for r in records if r.get("organisationId") in adminOrgs]
|
|
return [TrusteeAccess(**{k: v for k, v in r.items() if not k.startswith("_")}) for r in filtered]
|
|
|
|
def updateAccess(self, accessId: str, data: Dict[str, Any]) -> Optional[TrusteeAccess]:
|
|
"""Update an access record. Requires admin role for the organisation or ALL access level."""
|
|
# Check system RBAC first
|
|
if not self.checkRbacPermission(TrusteeAccess, "update"):
|
|
logger.warning(f"User {self.userId} lacks system permission to update access")
|
|
return None
|
|
|
|
# Get existing access to check organisation
|
|
existingRecords = self.db.getRecordset(TrusteeAccess, recordFilter={"id": accessId})
|
|
existing = existingRecords[0] if existingRecords else None
|
|
|
|
if not existing:
|
|
logger.warning(f"Access record {accessId} not found")
|
|
return None
|
|
|
|
organisationId = existing.get("organisationId")
|
|
|
|
# Users with ALL access level bypass feature-level permission check
|
|
accessLevel = self.getRbacAccessLevel(TrusteeAccess, "update")
|
|
|
|
# Check feature-level permission - must have admin role for this organisation (unless ALL access)
|
|
if accessLevel != AccessLevel.ALL and not self.checkUserTrusteePermission(self.userId, organisationId, "admin"):
|
|
logger.warning(f"User {self.userId} lacks admin role for organisation {organisationId}")
|
|
return None
|
|
|
|
data["id"] = accessId
|
|
updatedRecord = self.db.recordModify(TrusteeAccess, accessId, data)
|
|
if not updatedRecord:
|
|
return None
|
|
return TrusteeAccess(**{k: v for k, v in updatedRecord.items() if not k.startswith("_")})
|
|
|
|
def deleteAccess(self, accessId: str) -> bool:
|
|
"""Delete an access record. Requires admin role for the organisation or ALL access level."""
|
|
# Check system RBAC first
|
|
if not self.checkRbacPermission(TrusteeAccess, "delete"):
|
|
logger.warning(f"User {self.userId} lacks system permission to delete access")
|
|
return False
|
|
|
|
# Get existing access to check organisation
|
|
existingRecords = self.db.getRecordset(TrusteeAccess, recordFilter={"id": accessId})
|
|
existing = existingRecords[0] if existingRecords else None
|
|
|
|
if not existing:
|
|
logger.warning(f"Access record {accessId} not found")
|
|
return False
|
|
|
|
organisationId = existing.get("organisationId")
|
|
|
|
# Users with ALL access level bypass feature-level permission check
|
|
accessLevel = self.getRbacAccessLevel(TrusteeAccess, "delete")
|
|
|
|
# Check feature-level permission - must have admin role for this organisation (unless ALL access)
|
|
if accessLevel != AccessLevel.ALL and not self.checkUserTrusteePermission(self.userId, organisationId, "admin"):
|
|
logger.warning(f"User {self.userId} lacks admin role for organisation {organisationId}")
|
|
return False
|
|
|
|
return self.db.recordDelete(TrusteeAccess, accessId)
|
|
|
|
# ===== Contract CRUD =====
|
|
|
|
def createContract(self, data: Dict[str, Any]) -> Optional[TrusteeContract]:
|
|
"""Create a new contract."""
|
|
organisationId = data.get("organisationId")
|
|
|
|
# Check combined permission (system RBAC + feature-level)
|
|
if not self.checkCombinedPermission(TrusteeContract, "create", organisationId):
|
|
logger.warning(f"User {self.userId} lacks permission to create contract for org {organisationId}")
|
|
return None
|
|
|
|
data["mandateId"] = self.mandateId
|
|
if "featureInstanceId" not in data:
|
|
data["featureInstanceId"] = self.featureInstanceId
|
|
|
|
import uuid
|
|
contractId = data.get("id") or str(uuid.uuid4())
|
|
data["id"] = contractId
|
|
|
|
createdRecord = self.db.recordCreate(TrusteeContract, data)
|
|
if createdRecord and createdRecord.get("id"):
|
|
return TrusteeContract(**{k: v for k, v in createdRecord.items() if not k.startswith("_")})
|
|
return None
|
|
|
|
def getContract(self, contractId: str) -> Optional[TrusteeContract]:
|
|
"""Get a single contract by ID."""
|
|
records = self.db.getRecordset(TrusteeContract, recordFilter={"id": contractId})
|
|
if not records:
|
|
return None
|
|
return TrusteeContract(**{k: v for k, v in records[0].items() if not k.startswith("_")})
|
|
|
|
def getAllContracts(self, params: Optional[PaginationParams] = None) -> PaginatedResult:
|
|
"""Get all contracts with RBAC filtering + feature-level access filtering."""
|
|
# Step 1: System RBAC filtering
|
|
records = getRecordsetWithRBAC(
|
|
connector=self.db,
|
|
modelClass=TrusteeContract,
|
|
currentUser=self.currentUser,
|
|
recordFilter=None,
|
|
orderBy="id",
|
|
mandateId=self.mandateId,
|
|
featureInstanceId=self.featureInstanceId,
|
|
featureCode=self.FEATURE_CODE
|
|
)
|
|
|
|
totalItems = len(records)
|
|
if params:
|
|
pageSize = params.pageSize or 20
|
|
page = params.page or 1
|
|
startIdx = (page - 1) * pageSize
|
|
endIdx = startIdx + pageSize
|
|
items = records[startIdx:endIdx]
|
|
totalPages = math.ceil(totalItems / pageSize) if pageSize > 0 else 1
|
|
else:
|
|
items = records
|
|
totalPages = 1
|
|
page = 1
|
|
pageSize = totalItems
|
|
|
|
return PaginatedResult(
|
|
items=items,
|
|
totalItems=totalItems,
|
|
totalPages=totalPages
|
|
)
|
|
|
|
def getContractsByOrganisation(self, organisationId: str) -> List[TrusteeContract]:
|
|
"""Get all contracts for a specific organisation."""
|
|
# Step 1: System RBAC filtering
|
|
records = getRecordsetWithRBAC(
|
|
connector=self.db,
|
|
modelClass=TrusteeContract,
|
|
currentUser=self.currentUser,
|
|
recordFilter={"organisationId": organisationId},
|
|
orderBy="label",
|
|
mandateId=self.mandateId,
|
|
featureInstanceId=self.featureInstanceId,
|
|
featureCode=self.FEATURE_CODE
|
|
)
|
|
return [TrusteeContract(**{k: v for k, v in r.items() if not k.startswith("_")}) for r in records]
|
|
|
|
def updateContract(self, contractId: str, data: Dict[str, Any]) -> Optional[TrusteeContract]:
|
|
"""Update a contract (organisationId is immutable)."""
|
|
# Get existing contract to check organisation
|
|
existingRecords = self.db.getRecordset(TrusteeContract, recordFilter={"id": contractId})
|
|
existing = existingRecords[0] if existingRecords else None
|
|
|
|
if not existing:
|
|
logger.warning(f"Contract {contractId} not found")
|
|
return None
|
|
|
|
organisationId = existing.get("organisationId")
|
|
|
|
# Check combined permission (system RBAC + feature-level)
|
|
if not self.checkCombinedPermission(TrusteeContract, "update", organisationId):
|
|
logger.warning(f"User {self.userId} lacks permission to update contract in org {organisationId}")
|
|
return None
|
|
|
|
# Check if organisationId is being changed (not allowed)
|
|
if "organisationId" in data and data["organisationId"] != organisationId:
|
|
logger.error("Contract organisationId cannot be changed after creation")
|
|
return None
|
|
|
|
data["id"] = contractId
|
|
updatedRecord = self.db.recordModify(TrusteeContract, contractId, data)
|
|
if not updatedRecord:
|
|
return None
|
|
return TrusteeContract(**{k: v for k, v in updatedRecord.items() if not k.startswith("_")})
|
|
|
|
def deleteContract(self, contractId: str) -> bool:
|
|
"""Delete a contract."""
|
|
# Get existing contract to check organisation
|
|
existingRecords = self.db.getRecordset(TrusteeContract, recordFilter={"id": contractId})
|
|
existing = existingRecords[0] if existingRecords else None
|
|
|
|
if not existing:
|
|
logger.warning(f"Contract {contractId} not found")
|
|
return False
|
|
|
|
organisationId = existing.get("organisationId")
|
|
|
|
# Check combined permission (system RBAC + feature-level)
|
|
if not self.checkCombinedPermission(TrusteeContract, "delete", organisationId):
|
|
logger.warning(f"User {self.userId} lacks permission to delete contract in org {organisationId}")
|
|
return False
|
|
|
|
return self.db.recordDelete(TrusteeContract, contractId)
|
|
|
|
# ===== Document CRUD =====
|
|
|
|
def createDocument(self, data: Dict[str, Any]) -> Optional[TrusteeDocument]:
|
|
"""Create a new document.
|
|
|
|
Note: organisationId and contractId removed - feature instance IS the organisation.
|
|
Permission is checked via system RBAC (feature-level access).
|
|
"""
|
|
# Check system RBAC permission
|
|
if not self.checkCombinedPermission(TrusteeDocument, "create"):
|
|
logger.warning(f"User {self.userId} lacks permission to create document")
|
|
return None
|
|
|
|
# Auto-set context fields
|
|
data["mandateId"] = self.mandateId
|
|
data["featureInstanceId"] = self.featureInstanceId
|
|
|
|
import uuid
|
|
documentId = data.get("id") or str(uuid.uuid4())
|
|
data["id"] = documentId
|
|
|
|
createdRecord = self.db.recordCreate(TrusteeDocument, data)
|
|
if createdRecord and createdRecord.get("id"):
|
|
# Remove metadata from Pydantic model
|
|
cleanedRecord = {k: v for k, v in createdRecord.items() if not k.startswith("_")}
|
|
return TrusteeDocument(**cleanedRecord)
|
|
return None
|
|
|
|
def getDocument(self, documentId: str) -> Optional[TrusteeDocument]:
|
|
"""Get a single document by ID (metadata only)."""
|
|
records = self.db.getRecordset(TrusteeDocument, recordFilter={"id": documentId})
|
|
if not records:
|
|
return None
|
|
# Remove binary data and metadata from Pydantic model
|
|
cleanedRecord = {k: v for k, v in records[0].items() if not k.startswith("_") and k != "documentData"}
|
|
return TrusteeDocument(**cleanedRecord)
|
|
|
|
def getDocumentData(self, documentId: str) -> Optional[bytes]:
|
|
"""Get document binary data via fileId reference to central Files table."""
|
|
records = self.db.getRecordset(TrusteeDocument, recordFilter={"id": documentId})
|
|
record = records[0] if records else None
|
|
if not record:
|
|
return None
|
|
|
|
# New model: fileId references central Files table
|
|
fileId = record.get("fileId")
|
|
if fileId:
|
|
from modules.interfaces.interfaceDbManagement import getInterface as getDbInterface
|
|
dbInterface = getDbInterface(self.currentUser, mandateId=self.mandateId, featureInstanceId=self.featureInstanceId)
|
|
fileData = dbInterface.getFileData(fileId)
|
|
if fileData:
|
|
return fileData
|
|
logger.warning(f"File data not found for fileId {fileId}")
|
|
return None
|
|
|
|
# Legacy fallback: documentData was stored directly (for migration)
|
|
return record.get("documentData")
|
|
|
|
def getAllDocuments(self, params: Optional[PaginationParams] = None) -> PaginatedResult:
|
|
"""Get all documents with RBAC filtering + feature-level access filtering (metadata only)."""
|
|
# Step 1: System RBAC filtering
|
|
records = getRecordsetWithRBAC(
|
|
connector=self.db,
|
|
modelClass=TrusteeDocument,
|
|
currentUser=self.currentUser,
|
|
recordFilter=None,
|
|
orderBy="documentName",
|
|
mandateId=self.mandateId,
|
|
featureInstanceId=self.featureInstanceId,
|
|
featureCode=self.FEATURE_CODE
|
|
)
|
|
|
|
# Clean records (remove binary data and internal fields) - keep as dicts for filtering/sorting
|
|
cleanedRecords = []
|
|
for record in records:
|
|
cleanedRecord = {k: v for k, v in record.items() if not k.startswith("_") and k != "documentData"}
|
|
cleanedRecords.append(cleanedRecord)
|
|
|
|
# Step 2: Apply filters (search and field filters)
|
|
filteredRecords = self._applyFilters(cleanedRecords, params)
|
|
|
|
# Step 3: Apply sorting
|
|
sortedRecords = self._applySorting(filteredRecords, params)
|
|
|
|
# Step 4: Convert to Pydantic objects
|
|
pydanticItems = [TrusteeDocument(**r) for r in sortedRecords]
|
|
|
|
# Step 5: Apply pagination
|
|
totalItems = len(pydanticItems)
|
|
if params:
|
|
pageSize = params.pageSize or 20
|
|
page = params.page or 1
|
|
startIdx = (page - 1) * pageSize
|
|
endIdx = startIdx + pageSize
|
|
items = pydanticItems[startIdx:endIdx]
|
|
totalPages = math.ceil(totalItems / pageSize) if pageSize > 0 else 1
|
|
else:
|
|
items = pydanticItems
|
|
totalPages = 1
|
|
page = 1
|
|
pageSize = totalItems
|
|
|
|
return PaginatedResult(
|
|
items=items,
|
|
totalItems=totalItems,
|
|
totalPages=totalPages
|
|
)
|
|
|
|
def getDocumentsByContract(self, contractId: str) -> List[TrusteeDocument]:
|
|
"""Get all documents for a specific contract."""
|
|
# Step 1: System RBAC filtering
|
|
records = getRecordsetWithRBAC(
|
|
connector=self.db,
|
|
modelClass=TrusteeDocument,
|
|
currentUser=self.currentUser,
|
|
recordFilter={"contractId": contractId},
|
|
orderBy="documentName",
|
|
mandateId=self.mandateId,
|
|
featureInstanceId=self.featureInstanceId,
|
|
featureCode=self.FEATURE_CODE
|
|
)
|
|
|
|
result = []
|
|
for record in records:
|
|
cleanedRecord = {k: v for k, v in record.items() if not k.startswith("_") and k != "documentData"}
|
|
result.append(TrusteeDocument(**cleanedRecord))
|
|
return result
|
|
|
|
def updateDocument(self, documentId: str, data: Dict[str, Any]) -> Optional[TrusteeDocument]:
|
|
"""Update a document.
|
|
|
|
Note: organisationId and contractId removed - feature instance IS the organisation.
|
|
"""
|
|
# Get existing document to check creator
|
|
existingRecords = self.db.getRecordset(TrusteeDocument, recordFilter={"id": documentId})
|
|
existing = existingRecords[0] if existingRecords else None
|
|
|
|
if not existing:
|
|
logger.warning(f"Document {documentId} not found")
|
|
return None
|
|
|
|
createdBy = existing.get("_createdBy")
|
|
|
|
# Check system RBAC permission (userreport can only edit their own records)
|
|
if not self.checkCombinedPermission(TrusteeDocument, "update", recordCreatedBy=createdBy):
|
|
logger.warning(f"User {self.userId} lacks permission to update document")
|
|
return None
|
|
|
|
data["id"] = documentId
|
|
updatedRecord = self.db.recordModify(TrusteeDocument, documentId, data)
|
|
if not updatedRecord:
|
|
return None
|
|
cleanedRecord = {k: v for k, v in updatedRecord.items() if not k.startswith("_") and k != "documentData"}
|
|
return TrusteeDocument(**cleanedRecord)
|
|
|
|
def deleteDocument(self, documentId: str) -> bool:
|
|
"""Delete a document.
|
|
|
|
All position-document cross-table entries (TrusteePositionDocument) referencing
|
|
this document are deleted first, then the document.
|
|
"""
|
|
# Get existing document to check creator
|
|
existingRecords = self.db.getRecordset(TrusteeDocument, recordFilter={"id": documentId})
|
|
existing = existingRecords[0] if existingRecords else None
|
|
|
|
if not existing:
|
|
logger.warning(f"Document {documentId} not found")
|
|
return False
|
|
|
|
createdBy = existing.get("_createdBy")
|
|
|
|
# Check system RBAC permission (userreport can only delete their own records)
|
|
if not self.checkCombinedPermission(TrusteeDocument, "delete", recordCreatedBy=createdBy):
|
|
logger.warning(f"User {self.userId} lacks permission to delete document")
|
|
return False
|
|
|
|
self._deletePositionDocumentLinksForDocument(documentId)
|
|
return self.db.recordDelete(TrusteeDocument, documentId)
|
|
|
|
# ===== Position CRUD =====
|
|
|
|
def createPosition(self, data: Dict[str, Any]) -> Optional[TrusteePosition]:
|
|
"""Create a new position.
|
|
|
|
Note: organisationId and contractId removed - feature instance IS the organisation.
|
|
Permission is checked via system RBAC (feature-level access).
|
|
"""
|
|
# Check system RBAC permission
|
|
if not self.checkCombinedPermission(TrusteePosition, "create"):
|
|
logger.warning(f"User {self.userId} lacks permission to create position")
|
|
return None
|
|
|
|
# Auto-set context fields
|
|
data["mandateId"] = self.mandateId
|
|
data["featureInstanceId"] = self.featureInstanceId
|
|
|
|
# Calculate VAT amount if not provided
|
|
if "vatAmount" not in data or data.get("vatAmount") == 0:
|
|
bookingAmount = data.get("bookingAmount", 0)
|
|
vatPercentage = data.get("vatPercentage", 0)
|
|
data["vatAmount"] = bookingAmount * vatPercentage / 100
|
|
|
|
import uuid
|
|
positionId = data.get("id") or str(uuid.uuid4())
|
|
data["id"] = positionId
|
|
|
|
createdRecord = self.db.recordCreate(TrusteePosition, data)
|
|
if createdRecord and createdRecord.get("id"):
|
|
return TrusteePosition(**{k: v for k, v in createdRecord.items() if not k.startswith("_")})
|
|
return None
|
|
|
|
def getPosition(self, positionId: str) -> Optional[TrusteePosition]:
|
|
"""Get a single position by ID."""
|
|
records = self.db.getRecordset(TrusteePosition, recordFilter={"id": positionId})
|
|
if not records:
|
|
return None
|
|
return TrusteePosition(**{k: v for k, v in records[0].items() if not k.startswith("_")})
|
|
|
|
def getAllPositions(self, params: Optional[PaginationParams] = None) -> PaginatedResult:
|
|
"""Get all positions with RBAC filtering + feature-level access filtering."""
|
|
# Step 1: System RBAC filtering
|
|
records = getRecordsetWithRBAC(
|
|
connector=self.db,
|
|
modelClass=TrusteePosition,
|
|
currentUser=self.currentUser,
|
|
recordFilter=None,
|
|
orderBy="valuta",
|
|
mandateId=self.mandateId,
|
|
featureInstanceId=self.featureInstanceId,
|
|
featureCode=self.FEATURE_CODE
|
|
)
|
|
|
|
# Clean records (remove internal fields) - keep as dicts for filtering/sorting
|
|
cleanedRecords = []
|
|
for record in records:
|
|
cleanedRecord = {k: v for k, v in record.items() if not k.startswith("_")}
|
|
cleanedRecords.append(cleanedRecord)
|
|
|
|
# Step 2: Apply filters (search and field filters)
|
|
filteredRecords = self._applyFilters(cleanedRecords, params)
|
|
|
|
# Step 3: Apply sorting
|
|
sortedRecords = self._applySorting(filteredRecords, params)
|
|
|
|
# Step 4: Convert to Pydantic objects
|
|
pydanticItems = [TrusteePosition(**r) for r in sortedRecords]
|
|
|
|
# Step 5: Apply pagination
|
|
totalItems = len(pydanticItems)
|
|
if params:
|
|
pageSize = params.pageSize or 20
|
|
page = params.page or 1
|
|
startIdx = (page - 1) * pageSize
|
|
endIdx = startIdx + pageSize
|
|
items = pydanticItems[startIdx:endIdx]
|
|
totalPages = math.ceil(totalItems / pageSize) if pageSize > 0 else 1
|
|
else:
|
|
items = pydanticItems
|
|
totalPages = 1
|
|
page = 1
|
|
pageSize = totalItems
|
|
|
|
return PaginatedResult(
|
|
items=items,
|
|
totalItems=totalItems,
|
|
totalPages=totalPages
|
|
)
|
|
|
|
def getPositionsByContract(self, contractId: str) -> List[TrusteePosition]:
|
|
"""Get all positions for a specific contract."""
|
|
# Step 1: System RBAC filtering
|
|
records = getRecordsetWithRBAC(
|
|
connector=self.db,
|
|
modelClass=TrusteePosition,
|
|
currentUser=self.currentUser,
|
|
recordFilter={"contractId": contractId},
|
|
orderBy="valuta",
|
|
mandateId=self.mandateId,
|
|
featureInstanceId=self.featureInstanceId,
|
|
featureCode=self.FEATURE_CODE
|
|
)
|
|
return [TrusteePosition(**{k: v for k, v in r.items() if not k.startswith("_")}) for r in records]
|
|
|
|
def getPositionsByOrganisation(self, organisationId: str) -> List[TrusteePosition]:
|
|
"""Get all positions for a specific organisation."""
|
|
# Step 1: System RBAC filtering
|
|
records = getRecordsetWithRBAC(
|
|
connector=self.db,
|
|
modelClass=TrusteePosition,
|
|
currentUser=self.currentUser,
|
|
recordFilter={"organisationId": organisationId},
|
|
orderBy="valuta",
|
|
mandateId=self.mandateId,
|
|
featureInstanceId=self.featureInstanceId,
|
|
featureCode=self.FEATURE_CODE
|
|
)
|
|
return [TrusteePosition(**{k: v for k, v in r.items() if not k.startswith("_")}) for r in records]
|
|
|
|
def updatePosition(self, positionId: str, data: Dict[str, Any]) -> Optional[TrusteePosition]:
|
|
"""Update a position.
|
|
|
|
Note: organisationId and contractId removed - feature instance IS the organisation.
|
|
"""
|
|
# Get existing position to check creator
|
|
existingRecords = self.db.getRecordset(TrusteePosition, recordFilter={"id": positionId})
|
|
existing = existingRecords[0] if existingRecords else None
|
|
|
|
if not existing:
|
|
logger.warning(f"Position {positionId} not found")
|
|
return None
|
|
|
|
createdBy = existing.get("_createdBy")
|
|
|
|
# Check system RBAC permission (userreport can only edit their own records)
|
|
if not self.checkCombinedPermission(TrusteePosition, "update", recordCreatedBy=createdBy):
|
|
logger.warning(f"User {self.userId} lacks permission to update position")
|
|
return None
|
|
|
|
data["id"] = positionId
|
|
updatedRecord = self.db.recordModify(TrusteePosition, positionId, data)
|
|
if not updatedRecord:
|
|
return None
|
|
return TrusteePosition(**{k: v for k, v in updatedRecord.items() if not k.startswith("_")})
|
|
|
|
def deletePosition(self, positionId: str) -> bool:
|
|
"""Delete a position.
|
|
|
|
All position-document cross-table entries (TrusteePositionDocument) referencing
|
|
this position are deleted first, then the position.
|
|
"""
|
|
# Get existing position to check creator
|
|
existingRecords = self.db.getRecordset(TrusteePosition, recordFilter={"id": positionId})
|
|
existing = existingRecords[0] if existingRecords else None
|
|
|
|
if not existing:
|
|
logger.warning(f"Position {positionId} not found")
|
|
return False
|
|
|
|
createdBy = existing.get("_createdBy")
|
|
|
|
# Check system RBAC permission (userreport can only delete their own records)
|
|
if not self.checkCombinedPermission(TrusteePosition, "delete", recordCreatedBy=createdBy):
|
|
logger.warning(f"User {self.userId} lacks permission to delete position")
|
|
return False
|
|
|
|
self._deletePositionDocumentLinksForPosition(positionId)
|
|
return self.db.recordDelete(TrusteePosition, positionId)
|
|
|
|
# ===== Position-Document Link CRUD =====
|
|
|
|
def createPositionDocument(self, data: Dict[str, Any]) -> Optional[TrusteePositionDocument]:
|
|
"""Create a new position-document link.
|
|
|
|
Note: organisationId and contractId removed - feature instance IS the organisation.
|
|
Permission is checked via system RBAC (feature-level access).
|
|
"""
|
|
# Check system RBAC permission
|
|
if not self.checkCombinedPermission(TrusteePositionDocument, "create"):
|
|
logger.warning(f"User {self.userId} lacks permission to create position-document link")
|
|
return None
|
|
|
|
# Auto-set context fields
|
|
data["mandateId"] = self.mandateId
|
|
data["featureInstanceId"] = self.featureInstanceId
|
|
|
|
import uuid
|
|
linkId = data.get("id") or str(uuid.uuid4())
|
|
data["id"] = linkId
|
|
|
|
createdRecord = self.db.recordCreate(TrusteePositionDocument, data)
|
|
if createdRecord and createdRecord.get("id"):
|
|
return TrusteePositionDocument(**{k: v for k, v in createdRecord.items() if not k.startswith("_")})
|
|
return None
|
|
|
|
def getPositionDocument(self, linkId: str) -> Optional[TrusteePositionDocument]:
|
|
"""Get a single position-document link by ID."""
|
|
records = self.db.getRecordset(TrusteePositionDocument, recordFilter={"id": linkId})
|
|
if not records:
|
|
return None
|
|
return TrusteePositionDocument(**{k: v for k, v in records[0].items() if not k.startswith("_")})
|
|
|
|
def updatePositionDocument(self, linkId: str, data: Dict[str, Any]) -> Optional[TrusteePositionDocument]:
|
|
"""Update a position-document link."""
|
|
# Check permission
|
|
if not self.checkCombinedPermission(TrusteePositionDocument, "update"):
|
|
logger.warning(f"User {self.userId} lacks permission to update position-document link")
|
|
return None
|
|
|
|
# Verify link exists and belongs to this instance
|
|
existing = self.db.getRecordset(TrusteePositionDocument, recordFilter={"id": linkId})
|
|
if not existing:
|
|
logger.warning(f"Position-document link {linkId} not found")
|
|
return None
|
|
|
|
existingRecord = existing[0]
|
|
if existingRecord.get("featureInstanceId") != self.featureInstanceId:
|
|
logger.warning(f"Link {linkId} belongs to different instance")
|
|
return None
|
|
|
|
# Prevent changing context fields
|
|
data.pop("id", None)
|
|
data.pop("mandateId", None)
|
|
data.pop("featureInstanceId", None)
|
|
|
|
updatedRecord = self.db.recordModify(TrusteePositionDocument, linkId, data)
|
|
if updatedRecord and updatedRecord.get("id"):
|
|
return TrusteePositionDocument(**{k: v for k, v in updatedRecord.items() if not k.startswith("_")})
|
|
return None
|
|
|
|
def getAllPositionDocuments(self, params: Optional[PaginationParams] = None) -> PaginatedResult:
|
|
"""Get all position-document links with RBAC filtering + feature-level access filtering."""
|
|
# Step 1: System RBAC filtering with per-row permissions
|
|
records = getRecordsetWithRBAC(
|
|
connector=self.db,
|
|
modelClass=TrusteePositionDocument,
|
|
currentUser=self.currentUser,
|
|
recordFilter=None,
|
|
orderBy="id",
|
|
mandateId=self.mandateId,
|
|
featureInstanceId=self.featureInstanceId,
|
|
enrichPermissions=True,
|
|
featureCode=self.FEATURE_CODE
|
|
)
|
|
|
|
totalItems = len(records)
|
|
if params:
|
|
pageSize = params.pageSize or 20
|
|
page = params.page or 1
|
|
startIdx = (page - 1) * pageSize
|
|
endIdx = startIdx + pageSize
|
|
items = records[startIdx:endIdx]
|
|
totalPages = math.ceil(totalItems / pageSize) if pageSize > 0 else 1
|
|
else:
|
|
items = records
|
|
totalPages = 1
|
|
page = 1
|
|
pageSize = totalItems
|
|
|
|
return PaginatedResult(
|
|
items=items,
|
|
totalItems=totalItems,
|
|
totalPages=totalPages
|
|
)
|
|
|
|
def getDocumentsForPosition(self, positionId: str) -> List[TrusteePositionDocument]:
|
|
"""Get all documents linked to a position."""
|
|
# Step 1: System RBAC filtering
|
|
links = getRecordsetWithRBAC(
|
|
connector=self.db,
|
|
modelClass=TrusteePositionDocument,
|
|
currentUser=self.currentUser,
|
|
recordFilter={"positionId": positionId},
|
|
orderBy="id",
|
|
mandateId=self.mandateId,
|
|
featureInstanceId=self.featureInstanceId,
|
|
featureCode=self.FEATURE_CODE
|
|
)
|
|
return [TrusteePositionDocument(**{k: v for k, v in r.items() if not k.startswith("_")}) for r in links]
|
|
|
|
def getPositionsForDocument(self, documentId: str) -> List[TrusteePositionDocument]:
|
|
"""Get all positions linked to a document."""
|
|
# Step 1: System RBAC filtering
|
|
links = getRecordsetWithRBAC(
|
|
connector=self.db,
|
|
modelClass=TrusteePositionDocument,
|
|
currentUser=self.currentUser,
|
|
recordFilter={"documentId": documentId},
|
|
orderBy="id",
|
|
mandateId=self.mandateId,
|
|
featureInstanceId=self.featureInstanceId,
|
|
featureCode=self.FEATURE_CODE
|
|
)
|
|
return [TrusteePositionDocument(**{k: v for k, v in r.items() if not k.startswith("_")}) for r in links]
|
|
|
|
def deletePositionDocument(self, linkId: str) -> bool:
|
|
"""Delete a position-document link.
|
|
|
|
Note: organisationId and contractId removed - feature instance IS the organisation.
|
|
"""
|
|
# Get existing link to check creator
|
|
existingRecords = self.db.getRecordset(TrusteePositionDocument, recordFilter={"id": linkId})
|
|
existing = existingRecords[0] if existingRecords else None
|
|
|
|
if not existing:
|
|
logger.warning(f"Position-document link {linkId} not found")
|
|
return False
|
|
|
|
createdBy = existing.get("_createdBy")
|
|
|
|
# Check system RBAC permission (userreport can only delete their own records)
|
|
if not self.checkCombinedPermission(TrusteePositionDocument, "delete", recordCreatedBy=createdBy):
|
|
logger.warning(f"User {self.userId} lacks permission to delete position-document link")
|
|
return False
|
|
|
|
return self.db.recordDelete(TrusteePositionDocument, linkId)
|
|
|
|
def _deletePositionDocumentLinksForDocument(self, documentId: str) -> None:
|
|
"""Delete all position-document cross-table entries referencing this document."""
|
|
links = self.db.getRecordset(TrusteePositionDocument, recordFilter={"documentId": documentId})
|
|
for link in links:
|
|
linkId = link.get("id")
|
|
if linkId:
|
|
self.db.recordDelete(TrusteePositionDocument, linkId)
|
|
|
|
def _deletePositionDocumentLinksForPosition(self, positionId: str) -> None:
|
|
"""Delete all position-document cross-table entries referencing this position."""
|
|
links = self.db.getRecordset(TrusteePositionDocument, recordFilter={"positionId": positionId})
|
|
for link in links:
|
|
linkId = link.get("id")
|
|
if linkId:
|
|
self.db.recordDelete(TrusteePositionDocument, linkId)
|
|
|
|
# ===== Trustee-specific Access Check =====
|
|
|
|
def getUserAccessForOrganisation(self, userId: str, organisationId: str) -> List[Dict[str, Any]]:
|
|
"""Get all access records for a user in a specific organisation."""
|
|
return self.db.getRecordset(
|
|
TrusteeAccess,
|
|
{"userId": userId, "organisationId": organisationId}
|
|
)
|
|
|
|
def getAllUserAccess(self, userId: str) -> List[Dict[str, Any]]:
|
|
"""Get all access records for a user across all organisations."""
|
|
return self.db.getRecordset(TrusteeAccess, {"userId": userId})
|
|
|
|
def getUserTrusteeRoles(self, userId: str, organisationId: str, contractId: Optional[str] = None) -> List[str]:
|
|
"""
|
|
Get all trustee roles a user has for an organisation (and optionally contract).
|
|
|
|
Args:
|
|
userId: User ID to check
|
|
organisationId: Organisation ID
|
|
contractId: Optional contract ID for contract-specific access
|
|
|
|
Returns:
|
|
List of role IDs the user has
|
|
"""
|
|
accessRecords = self.getUserAccessForOrganisation(userId, organisationId)
|
|
roles = []
|
|
|
|
for access in accessRecords:
|
|
accessContractId = access.get("contractId")
|
|
roleId = access.get("roleId")
|
|
|
|
# If access has no contractId, it grants access to all contracts
|
|
if accessContractId is None:
|
|
if roleId not in roles:
|
|
roles.append(roleId)
|
|
# If checking for specific contract, match it
|
|
elif contractId and accessContractId == contractId:
|
|
if roleId not in roles:
|
|
roles.append(roleId)
|
|
# If no specific contract requested but access is contract-specific,
|
|
# only include if we're not filtering by contract
|
|
elif contractId is None:
|
|
# User has contract-specific access, but we're not filtering
|
|
# They can see the record but only for their specific contracts
|
|
if roleId not in roles:
|
|
roles.append(roleId)
|
|
|
|
return roles
|
|
|
|
def checkUserTrusteePermission(
|
|
self,
|
|
userId: str,
|
|
organisationId: str,
|
|
requiredRole: str,
|
|
contractId: Optional[str] = None
|
|
) -> bool:
|
|
"""
|
|
Check if a user has a specific role for an organisation (and optionally contract).
|
|
|
|
Args:
|
|
userId: User ID to check
|
|
organisationId: Organisation ID
|
|
requiredRole: Required role (userreport, admin, operate)
|
|
contractId: Optional contract ID for contract-specific access
|
|
|
|
Returns:
|
|
True if user has the required role
|
|
"""
|
|
roles = self.getUserTrusteeRoles(userId, organisationId, contractId)
|
|
return requiredRole in roles
|
|
|
|
def checkCombinedPermission(
|
|
self,
|
|
modelClass: type,
|
|
operation: str,
|
|
organisationId: Optional[str] = None,
|
|
contractId: Optional[str] = None,
|
|
recordCreatedBy: Optional[str] = None
|
|
) -> bool:
|
|
"""
|
|
Check combined system RBAC + feature-level RBAC permissions.
|
|
|
|
Args:
|
|
modelClass: The model class (e.g., TrusteeContract)
|
|
operation: Operation type (read, create, update, delete)
|
|
organisationId: Optional organisation ID for feature-level check
|
|
contractId: Optional contract ID for contract-level filtering
|
|
recordCreatedBy: Optional creator ID for userreport role checks
|
|
|
|
Returns:
|
|
True if user has permission
|
|
"""
|
|
# Step 1: Check system RBAC and get access level
|
|
accessLevel = self.getRbacAccessLevel(modelClass, operation)
|
|
if accessLevel == AccessLevel.NONE:
|
|
return False
|
|
|
|
# Users with ALL access level bypass feature-level checks
|
|
if accessLevel == AccessLevel.ALL:
|
|
return True
|
|
|
|
# Step 2: If no organisationId, system RBAC is sufficient (for listing all)
|
|
if organisationId is None:
|
|
return True
|
|
|
|
# Step 3: Check feature-level RBAC via trustee.access
|
|
roles = self.getUserTrusteeRoles(self.userId, organisationId, contractId)
|
|
|
|
if not roles:
|
|
# No trustee access for this organisation
|
|
return False
|
|
|
|
# Check role-based permissions
|
|
# admin role: full CRUD for organisation
|
|
if "admin" in roles:
|
|
return True
|
|
|
|
# operate role: CRUD for contracts, documents, positions
|
|
if "operate" in roles:
|
|
if modelClass in (TrusteeContract, TrusteeDocument, TrusteePosition, TrusteePositionDocument):
|
|
return True
|
|
# operate can read organisations
|
|
if modelClass == TrusteeOrganisation and operation == "read":
|
|
return True
|
|
|
|
# userreport role: CRUD own records for documents/positions
|
|
if "userreport" in roles:
|
|
if modelClass in (TrusteeDocument, TrusteePosition, TrusteePositionDocument):
|
|
# For create, always allowed
|
|
if operation == "create":
|
|
return True
|
|
# For read/update/delete, must be own record
|
|
if recordCreatedBy == self.userId:
|
|
return True
|
|
# userreport can read organisations and contracts
|
|
if modelClass in (TrusteeOrganisation, TrusteeContract) and operation == "read":
|
|
return True
|
|
|
|
return False
|