gateway/modules/features/trustee/interfaceFeatureTrustee.py
ValueOn AG df4c60fc99 fixes
2026-01-24 18:01:28 +01:00

1430 lines
59 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Interface to Trustee database.
Manages trustee organisations, roles, access, contracts, documents, and positions.
"""
import logging
import math
import uuid
from typing import Dict, Any, List, Optional, Union
from modules.connectors.connectorDbPostgre import DatabaseConnector
from modules.shared.configuration import APP_CONFIG
from modules.interfaces.interfaceRbac import getRecordsetWithRBAC
from modules.security.rbac import RbacClass
from modules.datamodels.datamodelUam import User, AccessLevel
from modules.datamodels.datamodelRbac import AccessRuleContext
from .datamodelFeatureTrustee import (
TrusteeOrganisation,
TrusteeRole,
TrusteeAccess,
TrusteeContract,
TrusteeDocument,
TrusteePosition,
TrusteePositionDocument,
)
from modules.datamodels.datamodelPagination import PaginationParams, PaginatedResult
logger = logging.getLogger(__name__)
# Singleton factory for TrusteeObjects instances per context
_trusteeInterfaces = {}
def getInterface(currentUser: User, mandateId: Optional[Union[str, uuid.UUID]] = None, featureInstanceId: Optional[str] = None) -> "TrusteeObjects":
"""Get or create a TrusteeObjects instance for the given user context.
Args:
currentUser: The authenticated user
mandateId: The mandate ID from RequestContext (X-Mandate-Id header). Required.
featureInstanceId: The feature instance ID from RequestContext (X-Feature-Instance-Id header).
"""
global _trusteeInterfaces
if not currentUser or not currentUser.id:
raise ValueError("Valid user context required")
effectiveMandateId = str(mandateId) if mandateId else None
effectiveFeatureInstanceId = str(featureInstanceId) if featureInstanceId else None
# Include featureInstanceId in cache key for proper isolation
cacheKey = f"{currentUser.id}_{effectiveMandateId}_{effectiveFeatureInstanceId}"
if cacheKey not in _trusteeInterfaces:
_trusteeInterfaces[cacheKey] = TrusteeObjects(currentUser, mandateId=effectiveMandateId, featureInstanceId=effectiveFeatureInstanceId)
else:
# Update user context if needed
_trusteeInterfaces[cacheKey].setUserContext(currentUser, mandateId=effectiveMandateId, featureInstanceId=effectiveFeatureInstanceId)
return _trusteeInterfaces[cacheKey]
class TrusteeObjects:
"""
Interface to Trustee database.
Manages trustee organisations, roles, access, contracts, documents, and positions.
"""
def __init__(self, currentUser: Optional[User] = None, mandateId: Optional[str] = None, featureInstanceId: Optional[str] = None):
"""Initializes the Trustee Interface.
Args:
currentUser: The authenticated user
mandateId: The mandate ID from RequestContext (X-Mandate-Id header)
featureInstanceId: The feature instance ID from RequestContext (X-Feature-Instance-Id header)
"""
self.currentUser = currentUser
self.userId = currentUser.id if currentUser else None
# Use mandateId from parameter (Request-Context), not from user object
self.mandateId = mandateId
self.featureInstanceId = featureInstanceId
self.rbac = None
# Initialize database
self._initializeDatabase()
# Set user context if provided
if currentUser:
self.setUserContext(currentUser, mandateId=mandateId, featureInstanceId=featureInstanceId)
def setUserContext(self, currentUser: User, mandateId: Optional[str] = None, featureInstanceId: Optional[str] = None):
"""Sets the user context for the interface.
Args:
currentUser: The authenticated user
mandateId: The mandate ID from RequestContext (X-Mandate-Id header)
featureInstanceId: The feature instance ID from RequestContext (X-Feature-Instance-Id header)
"""
if not currentUser:
logger.info("Initializing interface without user context")
return
self.currentUser = currentUser
self.userId = currentUser.id
# Use mandateId from parameter (Request-Context), not from user object
self.mandateId = mandateId
self.featureInstanceId = featureInstanceId
if not self.userId:
raise ValueError("Invalid user context: id is required")
# Note: mandateId is ALWAYS optional here - it comes from Request-Context, not from User.
# Users are NOT assigned to mandates by design - they get mandate context from the request.
# sysAdmin users can additionally perform cross-mandate operations.
# Without mandateId, operations will be filtered to accessible mandates via RBAC.
self.userLanguage = currentUser.language
# Initialize RBAC interface
from modules.security.rootAccess import getRootDbAppConnector
dbApp = getRootDbAppConnector()
self.rbac = RbacClass(self.db, dbApp=dbApp)
# Update database context
self.db.updateContext(self.userId)
def __del__(self):
"""Cleanup method to close database connection."""
if hasattr(self, "db") and self.db is not None:
try:
self.db.close()
except Exception as e:
logger.error(f"Error closing database connection: {e}")
def _initializeDatabase(self):
"""Initializes the database connection directly."""
try:
dbHost = APP_CONFIG.get("DB_HOST", "_no_config_default_data")
dbDatabase = "poweron_trustee"
dbUser = APP_CONFIG.get("DB_USER")
dbPassword = APP_CONFIG.get("DB_PASSWORD_SECRET")
dbPort = int(APP_CONFIG.get("DB_PORT", 5432))
self.db = DatabaseConnector(
dbHost=dbHost,
dbDatabase=dbDatabase,
dbUser=dbUser,
dbPassword=dbPassword,
dbPort=dbPort,
userId=self.userId,
)
self.db.initDbSystem()
logger.info(f"Trustee database initialized successfully for user {self.userId}")
except Exception as e:
logger.error(f"Failed to initialize Trustee database: {str(e)}")
raise
def checkRbacPermission(
self,
modelClass: type,
operation: str,
recordId: Optional[str] = None
) -> bool:
"""Check RBAC permission for a specific operation on a table."""
if not self.rbac or not self.currentUser:
return False
tableName = modelClass.__name__
permissions = self.rbac.getUserPermissions(
self.currentUser,
AccessRuleContext.DATA,
tableName
)
if not permissions.view:
return False
permLevel = getattr(permissions, operation, AccessLevel.NONE)
if permLevel == AccessLevel.NONE:
return False
return True
def getRbacAccessLevel(self, modelClass: type, operation: str) -> AccessLevel:
"""Get the RBAC access level for a specific operation on a table.
Returns:
AccessLevel (ALL, GROUP, MY, NONE) - determines what filtering is needed
"""
if not self.rbac or not self.currentUser:
return AccessLevel.NONE
tableName = modelClass.__name__
permissions = self.rbac.getUserPermissions(
self.currentUser,
AccessRuleContext.DATA,
tableName
)
if not permissions.view:
return AccessLevel.NONE
return getattr(permissions, operation, AccessLevel.NONE)
# ===== Organisation CRUD =====
def createOrganisation(self, data: Dict[str, Any]) -> Optional[TrusteeOrganisation]:
"""Create a new organisation."""
if not self.checkRbacPermission(TrusteeOrganisation, "create"):
logger.warning(f"User {self.userId} lacks permission to create organisation")
return None
# Set mandateId and featureInstanceId from context for proper data isolation
data["mandateId"] = self.mandateId
if "featureInstanceId" not in data:
data["featureInstanceId"] = self.featureInstanceId
# Validate ID format (alphanumeric, hyphens, underscores, 3-50 chars)
orgId = data.get("id", "")
if not orgId or len(orgId) < 3 or len(orgId) > 50:
logger.error(f"Invalid organisation ID length: {len(orgId)}")
return None
import re
if not re.match(r'^[a-zA-Z0-9_-]+$', orgId):
logger.error(f"Invalid organisation ID format: {orgId}")
return None
createdRecord = self.db.recordCreate(TrusteeOrganisation, data)
if createdRecord and createdRecord.get("id"):
return TrusteeOrganisation(**{k: v for k, v in createdRecord.items() if not k.startswith("_")})
return None
def getOrganisation(self, orgId: str) -> Optional[TrusteeOrganisation]:
"""Get a single organisation by ID."""
records = self.db.getRecordset(TrusteeOrganisation, recordFilter={"id": orgId})
if not records:
return None
return TrusteeOrganisation(**{k: v for k, v in records[0].items() if not k.startswith("_")})
def getAllOrganisations(self, params: Optional[PaginationParams] = None) -> PaginatedResult:
"""Get all organisations with RBAC filtering.
Note: Organisations are managed at system level (by mandate).
Feature-level filtering (trustee.access) is NOT applied here because:
- Admin users with system RBAC can manage all orgs in their mandate
- trustee.access grants access to specific orgs for other users
- New organisations wouldn't be visible without an access record
"""
# Debug: Log user info and permissions
logger.debug(f"getAllOrganisations called for user {self.userId}, mandateId: {self.mandateId}")
# System RBAC filtering (filters by mandate for GROUP access level)
records = getRecordsetWithRBAC(
connector=self.db,
modelClass=TrusteeOrganisation,
currentUser=self.currentUser,
recordFilter=None,
orderBy="id"
)
logger.debug(f"getAllOrganisations: getRecordsetWithRBAC returned {len(records)} records")
# Apply pagination
totalItems = len(records)
if params:
pageSize = params.pageSize or 20
page = params.page or 1
startIdx = (page - 1) * pageSize
endIdx = startIdx + pageSize
items = records[startIdx:endIdx]
totalPages = math.ceil(totalItems / pageSize) if pageSize > 0 else 1
else:
items = records
totalPages = 1
page = 1
pageSize = totalItems
return PaginatedResult(
items=items,
totalItems=totalItems,
totalPages=totalPages
)
def updateOrganisation(self, orgId: str, data: Dict[str, Any]) -> Optional[TrusteeOrganisation]:
"""Update an organisation."""
if not self.checkRbacPermission(TrusteeOrganisation, "update"):
logger.warning(f"User {self.userId} lacks permission to update organisation")
return None
# ID cannot be changed after creation
if "id" in data and data["id"] != orgId:
logger.error("Organisation ID cannot be changed")
return None
data["id"] = orgId
updatedRecord = self.db.recordModify(TrusteeOrganisation, orgId, data)
if not updatedRecord:
return None
return TrusteeOrganisation(**{k: v for k, v in updatedRecord.items() if not k.startswith("_")})
def deleteOrganisation(self, orgId: str) -> bool:
"""Delete an organisation."""
if not self.checkRbacPermission(TrusteeOrganisation, "delete"):
logger.warning(f"User {self.userId} lacks permission to delete organisation")
return False
return self.db.recordDelete(TrusteeOrganisation, orgId)
# ===== Role CRUD =====
def createRole(self, data: Dict[str, Any]) -> Optional[TrusteeRole]:
"""Create a new role (sysadmin only)."""
if not self.checkRbacPermission(TrusteeRole, "create"):
logger.warning(f"User {self.userId} lacks permission to create role")
return None
data["mandateId"] = self.mandateId
if "featureInstanceId" not in data:
data["featureInstanceId"] = self.featureInstanceId
roleId = data.get("id", "")
if not roleId:
logger.error("Role ID is required")
return None
createdRecord = self.db.recordCreate(TrusteeRole, data)
if createdRecord and createdRecord.get("id"):
return TrusteeRole(**{k: v for k, v in createdRecord.items() if not k.startswith("_")})
return None
def getRole(self, roleId: str) -> Optional[TrusteeRole]:
"""Get a single role by ID."""
records = self.db.getRecordset(TrusteeRole, recordFilter={"id": roleId})
if not records:
return None
return TrusteeRole(**{k: v for k, v in records[0].items() if not k.startswith("_")})
def getAllRoles(self, params: Optional[PaginationParams] = None) -> PaginatedResult:
"""Get all roles with RBAC filtering.
Note: Roles are available to all users with trustee access.
They are not filtered by organisation since they define the role types.
Users with ALL access level see all roles; others need trustee.access records.
"""
records = getRecordsetWithRBAC(
connector=self.db,
modelClass=TrusteeRole,
currentUser=self.currentUser,
recordFilter=None,
orderBy="id"
)
# Users with ALL access level (from system RBAC) see all roles
# Others need at least one trustee.access record
accessLevel = self.getRbacAccessLevel(TrusteeRole, "read")
if accessLevel != AccessLevel.ALL:
userAccess = self.getAllUserAccess(self.userId)
if not userAccess:
records = [] # No trustee access at all
totalItems = len(records)
if params:
pageSize = params.pageSize or 20
page = params.page or 1
startIdx = (page - 1) * pageSize
endIdx = startIdx + pageSize
items = records[startIdx:endIdx]
totalPages = math.ceil(totalItems / pageSize) if pageSize > 0 else 1
else:
items = records
totalPages = 1
page = 1
pageSize = totalItems
return PaginatedResult(
items=items,
totalItems=totalItems,
totalPages=totalPages
)
def updateRole(self, roleId: str, data: Dict[str, Any]) -> Optional[TrusteeRole]:
"""Update a role (sysadmin only)."""
if not self.checkRbacPermission(TrusteeRole, "update"):
logger.warning(f"User {self.userId} lacks permission to update role")
return None
data["id"] = roleId
updatedRecord = self.db.recordModify(TrusteeRole, roleId, data)
if not updatedRecord:
return None
return TrusteeRole(**{k: v for k, v in updatedRecord.items() if not k.startswith("_")})
def deleteRole(self, roleId: str) -> bool:
"""Delete a role (sysadmin only, not if in use)."""
if not self.checkRbacPermission(TrusteeRole, "delete"):
logger.warning(f"User {self.userId} lacks permission to delete role")
return False
# Check if role is in use
accessRecords = self.db.getRecordset(TrusteeAccess, recordFilter={"roleId": roleId})
if accessRecords:
logger.error(f"Cannot delete role {roleId}: still in use")
return False
return self.db.recordDelete(TrusteeRole, roleId)
# ===== Access CRUD =====
def createAccess(self, data: Dict[str, Any]) -> Optional[TrusteeAccess]:
"""Create a new access record. Requires admin role for the organisation or ALL access level."""
# Check system RBAC first
if not self.checkRbacPermission(TrusteeAccess, "create"):
logger.warning(f"User {self.userId} lacks system permission to create access")
return None
organisationId = data.get("organisationId")
# Users with ALL access level bypass feature-level permission check
accessLevel = self.getRbacAccessLevel(TrusteeAccess, "create")
# Check feature-level permission - must have admin role for this organisation (unless ALL access)
if accessLevel != AccessLevel.ALL and not self.checkUserTrusteePermission(self.userId, organisationId, "admin"):
logger.warning(f"User {self.userId} lacks admin role for organisation {organisationId}")
return None
data["mandateId"] = self.mandateId
if "featureInstanceId" not in data:
data["featureInstanceId"] = self.featureInstanceId
import uuid
accessId = data.get("id") or str(uuid.uuid4())
data["id"] = accessId
createdRecord = self.db.recordCreate(TrusteeAccess, data)
if createdRecord and createdRecord.get("id"):
return TrusteeAccess(**{k: v for k, v in createdRecord.items() if not k.startswith("_")})
return None
def getAccess(self, accessId: str) -> Optional[TrusteeAccess]:
"""Get a single access record by ID."""
records = self.db.getRecordset(TrusteeAccess, recordFilter={"id": accessId})
if not records:
return None
return TrusteeAccess(**{k: v for k, v in records[0].items() if not k.startswith("_")})
def getAllAccess(self, params: Optional[PaginationParams] = None) -> PaginatedResult:
"""Get all access records with RBAC filtering + feature-level filtering.
Users with ALL access level see all access records.
Others can only see access records for organisations they have admin access to.
"""
# Step 1: System RBAC filtering
records = getRecordsetWithRBAC(
connector=self.db,
modelClass=TrusteeAccess,
currentUser=self.currentUser,
recordFilter=None,
orderBy="id"
)
# Users with ALL access level (from system RBAC) see all records
accessLevel = self.getRbacAccessLevel(TrusteeAccess, "read")
if accessLevel != AccessLevel.ALL:
# Step 2: Feature-level filtering - only see access for organisations where user is admin
userAccess = self.getAllUserAccess(self.userId)
# Get organisations where user has admin role
adminOrgs = set()
for access in userAccess:
if access.get("roleId") == "admin":
adminOrgs.add(access.get("organisationId"))
# Filter records to only show those in admin organisations
if adminOrgs:
records = [r for r in records if r.get("organisationId") in adminOrgs]
else:
records = []
totalItems = len(records)
if params:
pageSize = params.pageSize or 20
page = params.page or 1
startIdx = (page - 1) * pageSize
endIdx = startIdx + pageSize
items = records[startIdx:endIdx]
totalPages = math.ceil(totalItems / pageSize) if pageSize > 0 else 1
else:
items = records
totalPages = 1
page = 1
pageSize = totalItems
return PaginatedResult(
items=items,
totalItems=totalItems,
totalPages=totalPages
)
def getAccessByOrganisation(self, organisationId: str) -> List[TrusteeAccess]:
"""Get all access records for a specific organisation.
Requires admin role for the organisation.
"""
# Check if user has admin access for this organisation
if not self.checkUserTrusteePermission(self.userId, organisationId, "admin"):
logger.warning(f"User {self.userId} lacks admin role for organisation {organisationId}")
return []
records = getRecordsetWithRBAC(
connector=self.db,
modelClass=TrusteeAccess,
currentUser=self.currentUser,
recordFilter={"organisationId": organisationId},
orderBy="id"
)
return [TrusteeAccess(**{k: v for k, v in r.items() if not k.startswith("_")}) for r in records]
def getAccessByUser(self, userId: str) -> List[TrusteeAccess]:
"""Get all access records for a specific user.
Users with ALL access level see all access records.
Others can only see access records for organisations where they have admin role.
"""
records = getRecordsetWithRBAC(
connector=self.db,
modelClass=TrusteeAccess,
currentUser=self.currentUser,
recordFilter={"userId": userId},
orderBy="id"
)
# Users with ALL access level (from system RBAC) see all records
accessLevel = self.getRbacAccessLevel(TrusteeAccess, "read")
if accessLevel == AccessLevel.ALL:
return [TrusteeAccess(**{k: v for k, v in r.items() if not k.startswith("_")}) for r in records]
# Filter to only organisations where current user has admin role
userAccess = self.getAllUserAccess(self.userId)
adminOrgs = set()
for access in userAccess:
if access.get("roleId") == "admin":
adminOrgs.add(access.get("organisationId"))
filtered = [r for r in records if r.get("organisationId") in adminOrgs]
return [TrusteeAccess(**{k: v for k, v in r.items() if not k.startswith("_")}) for r in filtered]
def updateAccess(self, accessId: str, data: Dict[str, Any]) -> Optional[TrusteeAccess]:
"""Update an access record. Requires admin role for the organisation or ALL access level."""
# Check system RBAC first
if not self.checkRbacPermission(TrusteeAccess, "update"):
logger.warning(f"User {self.userId} lacks system permission to update access")
return None
# Get existing access to check organisation
existingRecords = self.db.getRecordset(TrusteeAccess, recordFilter={"id": accessId})
existing = existingRecords[0] if existingRecords else None
if not existing:
logger.warning(f"Access record {accessId} not found")
return None
organisationId = existing.get("organisationId")
# Users with ALL access level bypass feature-level permission check
accessLevel = self.getRbacAccessLevel(TrusteeAccess, "update")
# Check feature-level permission - must have admin role for this organisation (unless ALL access)
if accessLevel != AccessLevel.ALL and not self.checkUserTrusteePermission(self.userId, organisationId, "admin"):
logger.warning(f"User {self.userId} lacks admin role for organisation {organisationId}")
return None
data["id"] = accessId
updatedRecord = self.db.recordModify(TrusteeAccess, accessId, data)
if not updatedRecord:
return None
return TrusteeAccess(**{k: v for k, v in updatedRecord.items() if not k.startswith("_")})
def deleteAccess(self, accessId: str) -> bool:
"""Delete an access record. Requires admin role for the organisation or ALL access level."""
# Check system RBAC first
if not self.checkRbacPermission(TrusteeAccess, "delete"):
logger.warning(f"User {self.userId} lacks system permission to delete access")
return False
# Get existing access to check organisation
existingRecords = self.db.getRecordset(TrusteeAccess, recordFilter={"id": accessId})
existing = existingRecords[0] if existingRecords else None
if not existing:
logger.warning(f"Access record {accessId} not found")
return False
organisationId = existing.get("organisationId")
# Users with ALL access level bypass feature-level permission check
accessLevel = self.getRbacAccessLevel(TrusteeAccess, "delete")
# Check feature-level permission - must have admin role for this organisation (unless ALL access)
if accessLevel != AccessLevel.ALL and not self.checkUserTrusteePermission(self.userId, organisationId, "admin"):
logger.warning(f"User {self.userId} lacks admin role for organisation {organisationId}")
return False
return self.db.recordDelete(TrusteeAccess, accessId)
# ===== Contract CRUD =====
def createContract(self, data: Dict[str, Any]) -> Optional[TrusteeContract]:
"""Create a new contract."""
organisationId = data.get("organisationId")
# Check combined permission (system RBAC + feature-level)
if not self.checkCombinedPermission(TrusteeContract, "create", organisationId):
logger.warning(f"User {self.userId} lacks permission to create contract for org {organisationId}")
return None
data["mandateId"] = self.mandateId
if "featureInstanceId" not in data:
data["featureInstanceId"] = self.featureInstanceId
import uuid
contractId = data.get("id") or str(uuid.uuid4())
data["id"] = contractId
createdRecord = self.db.recordCreate(TrusteeContract, data)
if createdRecord and createdRecord.get("id"):
return TrusteeContract(**{k: v for k, v in createdRecord.items() if not k.startswith("_")})
return None
def getContract(self, contractId: str) -> Optional[TrusteeContract]:
"""Get a single contract by ID."""
records = self.db.getRecordset(TrusteeContract, recordFilter={"id": contractId})
if not records:
return None
return TrusteeContract(**{k: v for k, v in records[0].items() if not k.startswith("_")})
def getAllContracts(self, params: Optional[PaginationParams] = None) -> PaginatedResult:
"""Get all contracts with RBAC filtering + feature-level access filtering."""
# Step 1: System RBAC filtering
records = getRecordsetWithRBAC(
connector=self.db,
modelClass=TrusteeContract,
currentUser=self.currentUser,
recordFilter=None,
orderBy="id"
)
# Step 2: Feature-level filtering based on trustee.access
records = self.filterRecordsByTrusteeAccess(records, TrusteeContract)
totalItems = len(records)
if params:
pageSize = params.pageSize or 20
page = params.page or 1
startIdx = (page - 1) * pageSize
endIdx = startIdx + pageSize
items = records[startIdx:endIdx]
totalPages = math.ceil(totalItems / pageSize) if pageSize > 0 else 1
else:
items = records
totalPages = 1
page = 1
pageSize = totalItems
return PaginatedResult(
items=items,
totalItems=totalItems,
totalPages=totalPages
)
def getContractsByOrganisation(self, organisationId: str) -> List[TrusteeContract]:
"""Get all contracts for a specific organisation."""
# Step 1: System RBAC filtering
records = getRecordsetWithRBAC(
connector=self.db,
modelClass=TrusteeContract,
currentUser=self.currentUser,
recordFilter={"organisationId": organisationId},
orderBy="label"
)
# Step 2: Feature-level filtering based on trustee.access
filtered = self.filterRecordsByTrusteeAccess(records, TrusteeContract)
return [TrusteeContract(**{k: v for k, v in r.items() if not k.startswith("_")}) for r in filtered]
def updateContract(self, contractId: str, data: Dict[str, Any]) -> Optional[TrusteeContract]:
"""Update a contract (organisationId is immutable)."""
# Get existing contract to check organisation
existingRecords = self.db.getRecordset(TrusteeContract, recordFilter={"id": contractId})
existing = existingRecords[0] if existingRecords else None
if not existing:
logger.warning(f"Contract {contractId} not found")
return None
organisationId = existing.get("organisationId")
# Check combined permission (system RBAC + feature-level)
if not self.checkCombinedPermission(TrusteeContract, "update", organisationId):
logger.warning(f"User {self.userId} lacks permission to update contract in org {organisationId}")
return None
# Check if organisationId is being changed (not allowed)
if "organisationId" in data and data["organisationId"] != organisationId:
logger.error("Contract organisationId cannot be changed after creation")
return None
data["id"] = contractId
updatedRecord = self.db.recordModify(TrusteeContract, contractId, data)
if not updatedRecord:
return None
return TrusteeContract(**{k: v for k, v in updatedRecord.items() if not k.startswith("_")})
def deleteContract(self, contractId: str) -> bool:
"""Delete a contract."""
# Get existing contract to check organisation
existingRecords = self.db.getRecordset(TrusteeContract, recordFilter={"id": contractId})
existing = existingRecords[0] if existingRecords else None
if not existing:
logger.warning(f"Contract {contractId} not found")
return False
organisationId = existing.get("organisationId")
# Check combined permission (system RBAC + feature-level)
if not self.checkCombinedPermission(TrusteeContract, "delete", organisationId):
logger.warning(f"User {self.userId} lacks permission to delete contract in org {organisationId}")
return False
return self.db.recordDelete(TrusteeContract, contractId)
# ===== Document CRUD =====
def createDocument(self, data: Dict[str, Any]) -> Optional[TrusteeDocument]:
"""Create a new document.
Note: organisationId and contractId removed - feature instance IS the organisation.
Permission is checked via system RBAC (feature-level access).
"""
# Check system RBAC permission
if not self.checkCombinedPermission(TrusteeDocument, "create"):
logger.warning(f"User {self.userId} lacks permission to create document")
return None
# Auto-set context fields
data["mandateId"] = self.mandateId
data["featureInstanceId"] = self.featureInstanceId
import uuid
documentId = data.get("id") or str(uuid.uuid4())
data["id"] = documentId
createdRecord = self.db.recordCreate(TrusteeDocument, data)
if createdRecord and createdRecord.get("id"):
# Remove binary data and metadata from Pydantic model
cleanedRecord = {k: v for k, v in createdRecord.items() if not k.startswith("_") and k != "documentData"}
return TrusteeDocument(**cleanedRecord)
return None
def getDocument(self, documentId: str) -> Optional[TrusteeDocument]:
"""Get a single document by ID (metadata only)."""
records = self.db.getRecordset(TrusteeDocument, recordFilter={"id": documentId})
if not records:
return None
# Remove binary data and metadata from Pydantic model
cleanedRecord = {k: v for k, v in records[0].items() if not k.startswith("_") and k != "documentData"}
return TrusteeDocument(**cleanedRecord)
def getDocumentData(self, documentId: str) -> Optional[bytes]:
"""Get document binary data."""
records = self.db.getRecordset(TrusteeDocument, recordFilter={"id": documentId})
record = records[0] if records else None
if record:
return record.get("documentData")
return None
def getAllDocuments(self, params: Optional[PaginationParams] = None) -> PaginatedResult:
"""Get all documents with RBAC filtering + feature-level access filtering (metadata only)."""
# Step 1: System RBAC filtering
records = getRecordsetWithRBAC(
connector=self.db,
modelClass=TrusteeDocument,
currentUser=self.currentUser,
recordFilter=None,
orderBy="documentName"
)
# Step 2: Feature-level filtering based on trustee.access
# This applies userreport filtering (only own records)
records = self.filterRecordsByTrusteeAccess(records, TrusteeDocument)
# Convert dicts to Pydantic objects (remove binary data and internal fields)
pydanticItems = []
for record in records:
cleanedRecord = {k: v for k, v in record.items() if not k.startswith("_") and k != "documentData"}
pydanticItems.append(TrusteeDocument(**cleanedRecord))
totalItems = len(pydanticItems)
if params:
pageSize = params.pageSize or 20
page = params.page or 1
startIdx = (page - 1) * pageSize
endIdx = startIdx + pageSize
items = pydanticItems[startIdx:endIdx]
totalPages = math.ceil(totalItems / pageSize) if pageSize > 0 else 1
else:
items = pydanticItems
totalPages = 1
page = 1
pageSize = totalItems
return PaginatedResult(
items=items,
totalItems=totalItems,
totalPages=totalPages
)
def getDocumentsByContract(self, contractId: str) -> List[TrusteeDocument]:
"""Get all documents for a specific contract."""
# Step 1: System RBAC filtering
records = getRecordsetWithRBAC(
connector=self.db,
modelClass=TrusteeDocument,
currentUser=self.currentUser,
recordFilter={"contractId": contractId},
orderBy="documentName"
)
# Step 2: Feature-level filtering based on trustee.access
filtered = self.filterRecordsByTrusteeAccess(records, TrusteeDocument)
result = []
for record in filtered:
cleanedRecord = {k: v for k, v in record.items() if not k.startswith("_") and k != "documentData"}
result.append(TrusteeDocument(**cleanedRecord))
return result
def updateDocument(self, documentId: str, data: Dict[str, Any]) -> Optional[TrusteeDocument]:
"""Update a document.
Note: organisationId and contractId removed - feature instance IS the organisation.
"""
# Get existing document to check creator
existingRecords = self.db.getRecordset(TrusteeDocument, recordFilter={"id": documentId})
existing = existingRecords[0] if existingRecords else None
if not existing:
logger.warning(f"Document {documentId} not found")
return None
createdBy = existing.get("_createdBy")
# Check system RBAC permission (userreport can only edit their own records)
if not self.checkCombinedPermission(TrusteeDocument, "update", recordCreatedBy=createdBy):
logger.warning(f"User {self.userId} lacks permission to update document")
return None
data["id"] = documentId
updatedRecord = self.db.recordModify(TrusteeDocument, documentId, data)
if not updatedRecord:
return None
cleanedRecord = {k: v for k, v in updatedRecord.items() if not k.startswith("_") and k != "documentData"}
return TrusteeDocument(**cleanedRecord)
def deleteDocument(self, documentId: str) -> bool:
"""Delete a document.
Note: organisationId and contractId removed - feature instance IS the organisation.
"""
# Get existing document to check creator
existingRecords = self.db.getRecordset(TrusteeDocument, recordFilter={"id": documentId})
existing = existingRecords[0] if existingRecords else None
if not existing:
logger.warning(f"Document {documentId} not found")
return False
createdBy = existing.get("_createdBy")
# Check system RBAC permission (userreport can only delete their own records)
if not self.checkCombinedPermission(TrusteeDocument, "delete", recordCreatedBy=createdBy):
logger.warning(f"User {self.userId} lacks permission to delete document")
return False
return self.db.recordDelete(TrusteeDocument, documentId)
# ===== Position CRUD =====
def createPosition(self, data: Dict[str, Any]) -> Optional[TrusteePosition]:
"""Create a new position.
Note: organisationId and contractId removed - feature instance IS the organisation.
Permission is checked via system RBAC (feature-level access).
"""
# Check system RBAC permission
if not self.checkCombinedPermission(TrusteePosition, "create"):
logger.warning(f"User {self.userId} lacks permission to create position")
return None
# Auto-set context fields
data["mandateId"] = self.mandateId
data["featureInstanceId"] = self.featureInstanceId
# Calculate VAT amount if not provided
if "vatAmount" not in data or data.get("vatAmount") == 0:
bookingAmount = data.get("bookingAmount", 0)
vatPercentage = data.get("vatPercentage", 0)
data["vatAmount"] = bookingAmount * vatPercentage / 100
import uuid
positionId = data.get("id") or str(uuid.uuid4())
data["id"] = positionId
createdRecord = self.db.recordCreate(TrusteePosition, data)
if createdRecord and createdRecord.get("id"):
return TrusteePosition(**{k: v for k, v in createdRecord.items() if not k.startswith("_")})
return None
def getPosition(self, positionId: str) -> Optional[TrusteePosition]:
"""Get a single position by ID."""
records = self.db.getRecordset(TrusteePosition, recordFilter={"id": positionId})
if not records:
return None
return TrusteePosition(**{k: v for k, v in records[0].items() if not k.startswith("_")})
def getAllPositions(self, params: Optional[PaginationParams] = None) -> PaginatedResult:
"""Get all positions with RBAC filtering + feature-level access filtering."""
# Step 1: System RBAC filtering
records = getRecordsetWithRBAC(
connector=self.db,
modelClass=TrusteePosition,
currentUser=self.currentUser,
recordFilter=None,
orderBy="valuta"
)
# Step 2: Feature-level filtering based on trustee.access
# This applies userreport filtering (only own records)
records = self.filterRecordsByTrusteeAccess(records, TrusteePosition)
# Convert dicts to Pydantic objects (remove internal fields)
pydanticItems = []
for record in records:
cleanedRecord = {k: v for k, v in record.items() if not k.startswith("_")}
pydanticItems.append(TrusteePosition(**cleanedRecord))
totalItems = len(pydanticItems)
if params:
pageSize = params.pageSize or 20
page = params.page or 1
startIdx = (page - 1) * pageSize
endIdx = startIdx + pageSize
items = pydanticItems[startIdx:endIdx]
totalPages = math.ceil(totalItems / pageSize) if pageSize > 0 else 1
else:
items = pydanticItems
totalPages = 1
page = 1
pageSize = totalItems
return PaginatedResult(
items=items,
totalItems=totalItems,
totalPages=totalPages
)
def getPositionsByContract(self, contractId: str) -> List[TrusteePosition]:
"""Get all positions for a specific contract."""
# Step 1: System RBAC filtering
records = getRecordsetWithRBAC(
connector=self.db,
modelClass=TrusteePosition,
currentUser=self.currentUser,
recordFilter={"contractId": contractId},
orderBy="valuta"
)
# Step 2: Feature-level filtering based on trustee.access
filtered = self.filterRecordsByTrusteeAccess(records, TrusteePosition)
return [TrusteePosition(**{k: v for k, v in r.items() if not k.startswith("_")}) for r in filtered]
def getPositionsByOrganisation(self, organisationId: str) -> List[TrusteePosition]:
"""Get all positions for a specific organisation."""
# Step 1: System RBAC filtering
records = getRecordsetWithRBAC(
connector=self.db,
modelClass=TrusteePosition,
currentUser=self.currentUser,
recordFilter={"organisationId": organisationId},
orderBy="valuta"
)
# Step 2: Feature-level filtering based on trustee.access
filtered = self.filterRecordsByTrusteeAccess(records, TrusteePosition)
return [TrusteePosition(**{k: v for k, v in r.items() if not k.startswith("_")}) for r in filtered]
def updatePosition(self, positionId: str, data: Dict[str, Any]) -> Optional[TrusteePosition]:
"""Update a position.
Note: organisationId and contractId removed - feature instance IS the organisation.
"""
# Get existing position to check creator
existingRecords = self.db.getRecordset(TrusteePosition, recordFilter={"id": positionId})
existing = existingRecords[0] if existingRecords else None
if not existing:
logger.warning(f"Position {positionId} not found")
return None
createdBy = existing.get("_createdBy")
# Check system RBAC permission (userreport can only edit their own records)
if not self.checkCombinedPermission(TrusteePosition, "update", recordCreatedBy=createdBy):
logger.warning(f"User {self.userId} lacks permission to update position")
return None
data["id"] = positionId
updatedRecord = self.db.recordModify(TrusteePosition, positionId, data)
if not updatedRecord:
return None
return TrusteePosition(**{k: v for k, v in updatedRecord.items() if not k.startswith("_")})
def deletePosition(self, positionId: str) -> bool:
"""Delete a position.
Note: organisationId and contractId removed - feature instance IS the organisation.
"""
# Get existing position to check creator
existingRecords = self.db.getRecordset(TrusteePosition, recordFilter={"id": positionId})
existing = existingRecords[0] if existingRecords else None
if not existing:
logger.warning(f"Position {positionId} not found")
return False
createdBy = existing.get("_createdBy")
# Check system RBAC permission (userreport can only delete their own records)
if not self.checkCombinedPermission(TrusteePosition, "delete", recordCreatedBy=createdBy):
logger.warning(f"User {self.userId} lacks permission to delete position")
return False
return self.db.recordDelete(TrusteePosition, positionId)
# ===== Position-Document Link CRUD =====
def createPositionDocument(self, data: Dict[str, Any]) -> Optional[TrusteePositionDocument]:
"""Create a new position-document link.
Note: organisationId and contractId removed - feature instance IS the organisation.
Permission is checked via system RBAC (feature-level access).
"""
# Check system RBAC permission
if not self.checkCombinedPermission(TrusteePositionDocument, "create"):
logger.warning(f"User {self.userId} lacks permission to create position-document link")
return None
# Auto-set context fields
data["mandateId"] = self.mandateId
data["featureInstanceId"] = self.featureInstanceId
import uuid
linkId = data.get("id") or str(uuid.uuid4())
data["id"] = linkId
createdRecord = self.db.recordCreate(TrusteePositionDocument, data)
if createdRecord and createdRecord.get("id"):
return TrusteePositionDocument(**{k: v for k, v in createdRecord.items() if not k.startswith("_")})
return None
def getPositionDocument(self, linkId: str) -> Optional[TrusteePositionDocument]:
"""Get a single position-document link by ID."""
records = self.db.getRecordset(TrusteePositionDocument, recordFilter={"id": linkId})
if not records:
return None
return TrusteePositionDocument(**{k: v for k, v in records[0].items() if not k.startswith("_")})
def getAllPositionDocuments(self, params: Optional[PaginationParams] = None) -> PaginatedResult:
"""Get all position-document links with RBAC filtering + feature-level access filtering."""
# Step 1: System RBAC filtering
records = getRecordsetWithRBAC(
connector=self.db,
modelClass=TrusteePositionDocument,
currentUser=self.currentUser,
recordFilter=None,
orderBy="id"
)
# Step 2: Feature-level filtering based on trustee.access
# This applies userreport filtering (only own records)
records = self.filterRecordsByTrusteeAccess(records, TrusteePositionDocument)
totalItems = len(records)
if params:
pageSize = params.pageSize or 20
page = params.page or 1
startIdx = (page - 1) * pageSize
endIdx = startIdx + pageSize
items = records[startIdx:endIdx]
totalPages = math.ceil(totalItems / pageSize) if pageSize > 0 else 1
else:
items = records
totalPages = 1
page = 1
pageSize = totalItems
return PaginatedResult(
items=items,
totalItems=totalItems,
totalPages=totalPages
)
def getDocumentsForPosition(self, positionId: str) -> List[TrusteePositionDocument]:
"""Get all documents linked to a position."""
# Step 1: System RBAC filtering
links = getRecordsetWithRBAC(
connector=self.db,
modelClass=TrusteePositionDocument,
currentUser=self.currentUser,
recordFilter={"positionId": positionId},
orderBy="id"
)
# Step 2: Feature-level filtering based on trustee.access
filtered = self.filterRecordsByTrusteeAccess(links, TrusteePositionDocument)
return [TrusteePositionDocument(**{k: v for k, v in r.items() if not k.startswith("_")}) for r in filtered]
def getPositionsForDocument(self, documentId: str) -> List[TrusteePositionDocument]:
"""Get all positions linked to a document."""
# Step 1: System RBAC filtering
links = getRecordsetWithRBAC(
connector=self.db,
modelClass=TrusteePositionDocument,
currentUser=self.currentUser,
recordFilter={"documentId": documentId},
orderBy="id"
)
# Step 2: Feature-level filtering based on trustee.access
filtered = self.filterRecordsByTrusteeAccess(links, TrusteePositionDocument)
return [TrusteePositionDocument(**{k: v for k, v in r.items() if not k.startswith("_")}) for r in filtered]
def deletePositionDocument(self, linkId: str) -> bool:
"""Delete a position-document link.
Note: organisationId and contractId removed - feature instance IS the organisation.
"""
# Get existing link to check creator
existingRecords = self.db.getRecordset(TrusteePositionDocument, recordFilter={"id": linkId})
existing = existingRecords[0] if existingRecords else None
if not existing:
logger.warning(f"Position-document link {linkId} not found")
return False
createdBy = existing.get("_createdBy")
# Check system RBAC permission (userreport can only delete their own records)
if not self.checkCombinedPermission(TrusteePositionDocument, "delete", recordCreatedBy=createdBy):
logger.warning(f"User {self.userId} lacks permission to delete position-document link")
return False
return self.db.recordDelete(TrusteePositionDocument, linkId)
# ===== Trustee-specific Access Check =====
def getUserAccessForOrganisation(self, userId: str, organisationId: str) -> List[Dict[str, Any]]:
"""Get all access records for a user in a specific organisation."""
return self.db.getRecordset(
TrusteeAccess,
{"userId": userId, "organisationId": organisationId}
)
def getAllUserAccess(self, userId: str) -> List[Dict[str, Any]]:
"""Get all access records for a user across all organisations."""
return self.db.getRecordset(TrusteeAccess, {"userId": userId})
def getUserTrusteeRoles(self, userId: str, organisationId: str, contractId: Optional[str] = None) -> List[str]:
"""
Get all trustee roles a user has for an organisation (and optionally contract).
Args:
userId: User ID to check
organisationId: Organisation ID
contractId: Optional contract ID for contract-specific access
Returns:
List of role IDs the user has
"""
accessRecords = self.getUserAccessForOrganisation(userId, organisationId)
roles = []
for access in accessRecords:
accessContractId = access.get("contractId")
roleId = access.get("roleId")
# If access has no contractId, it grants access to all contracts
if accessContractId is None:
if roleId not in roles:
roles.append(roleId)
# If checking for specific contract, match it
elif contractId and accessContractId == contractId:
if roleId not in roles:
roles.append(roleId)
# If no specific contract requested but access is contract-specific,
# only include if we're not filtering by contract
elif contractId is None:
# User has contract-specific access, but we're not filtering
# They can see the record but only for their specific contracts
if roleId not in roles:
roles.append(roleId)
return roles
def checkUserTrusteePermission(
self,
userId: str,
organisationId: str,
requiredRole: str,
contractId: Optional[str] = None
) -> bool:
"""
Check if a user has a specific role for an organisation (and optionally contract).
Args:
userId: User ID to check
organisationId: Organisation ID
requiredRole: Required role (userreport, admin, operate)
contractId: Optional contract ID for contract-specific access
Returns:
True if user has the required role
"""
roles = self.getUserTrusteeRoles(userId, organisationId, contractId)
return requiredRole in roles
def checkCombinedPermission(
self,
modelClass: type,
operation: str,
organisationId: Optional[str] = None,
contractId: Optional[str] = None,
recordCreatedBy: Optional[str] = None
) -> bool:
"""
Check combined system RBAC + feature-level RBAC permissions.
Args:
modelClass: The model class (e.g., TrusteeContract)
operation: Operation type (read, create, update, delete)
organisationId: Optional organisation ID for feature-level check
contractId: Optional contract ID for contract-level filtering
recordCreatedBy: Optional creator ID for userreport role checks
Returns:
True if user has permission
"""
# Step 1: Check system RBAC and get access level
accessLevel = self.getRbacAccessLevel(modelClass, operation)
if accessLevel == AccessLevel.NONE:
return False
# Users with ALL access level bypass feature-level checks
if accessLevel == AccessLevel.ALL:
return True
# Step 2: If no organisationId, system RBAC is sufficient (for listing all)
if organisationId is None:
return True
# Step 3: Check feature-level RBAC via trustee.access
roles = self.getUserTrusteeRoles(self.userId, organisationId, contractId)
if not roles:
# No trustee access for this organisation
return False
# Check role-based permissions
# admin role: full CRUD for organisation
if "admin" in roles:
return True
# operate role: CRUD for contracts, documents, positions
if "operate" in roles:
if modelClass in (TrusteeContract, TrusteeDocument, TrusteePosition, TrusteePositionDocument):
return True
# operate can read organisations
if modelClass == TrusteeOrganisation and operation == "read":
return True
# userreport role: CRUD own records for documents/positions
if "userreport" in roles:
if modelClass in (TrusteeDocument, TrusteePosition, TrusteePositionDocument):
# For create, always allowed
if operation == "create":
return True
# For read/update/delete, must be own record
if recordCreatedBy == self.userId:
return True
# userreport can read organisations and contracts
if modelClass in (TrusteeOrganisation, TrusteeContract) and operation == "read":
return True
return False
def filterRecordsByTrusteeAccess(
self,
records: List[Dict[str, Any]],
modelClass: type
) -> List[Dict[str, Any]]:
"""
Filter records based on user's trustee.access permissions.
Args:
records: List of records to filter
modelClass: The model class for determining filter logic
Returns:
Filtered list of records
"""
if not records:
return records
# Users with ALL access level bypass feature-level filtering
accessLevel = self.getRbacAccessLevel(modelClass, "read")
if accessLevel == AccessLevel.ALL:
return records
# NEW: Feature-instance based access (new system)
# If featureInstanceId is set, user has access via FeatureAccess system.
# Data is already filtered by featureInstanceId in getRecordsetWithRBAC.
# The old TrusteeAccess system (organisation-based) is not used for
# feature-instance scoped data.
if self.featureInstanceId:
return records # User already has access to this feature instance
# LEGACY: TrusteeAccess based filtering (for backwards compatibility)
# Get all user's access records
userAccess = self.getAllUserAccess(self.userId)
if not userAccess:
# No trustee access at all - return empty for trustee tables
return []
# Build lookup for user's accessible organisations and contracts
accessByOrg = {} # {orgId: {'roles': [...], 'contracts': [...]}}
hasFullOrgAccess = {} # {orgId: True} if user has access without contractId restriction
for access in userAccess:
orgId = access.get("organisationId")
roleId = access.get("roleId")
contractIdAccess = access.get("contractId")
if orgId not in accessByOrg:
accessByOrg[orgId] = {"roles": [], "contracts": []}
if roleId not in accessByOrg[orgId]["roles"]:
accessByOrg[orgId]["roles"].append(roleId)
if contractIdAccess is None:
hasFullOrgAccess[orgId] = True
elif contractIdAccess not in accessByOrg[orgId]["contracts"]:
accessByOrg[orgId]["contracts"].append(contractIdAccess)
filteredRecords = []
for record in records:
orgId = record.get("organisationId")
contractId = record.get("contractId")
createdBy = record.get("_createdBy")
# For Organisation model, filter by accessible organisations
if modelClass == TrusteeOrganisation:
recordOrgId = record.get("id")
if recordOrgId in accessByOrg:
filteredRecords.append(record)
continue
# Check if user has access to this organisation
if orgId not in accessByOrg:
continue
roles = accessByOrg[orgId]["roles"]
# admin has full access to organisation
if "admin" in roles:
# Check contract filtering
if contractId:
if hasFullOrgAccess.get(orgId) or contractId in accessByOrg[orgId]["contracts"]:
filteredRecords.append(record)
else:
filteredRecords.append(record)
continue
# operate has full access to organisation data
if "operate" in roles:
if contractId:
if hasFullOrgAccess.get(orgId) or contractId in accessByOrg[orgId]["contracts"]:
filteredRecords.append(record)
else:
filteredRecords.append(record)
continue
# userreport can only see own records for documents/positions
if "userreport" in roles:
if modelClass in (TrusteeDocument, TrusteePosition, TrusteePositionDocument):
# Must be own record
if createdBy == self.userId:
# Also check contract access
if contractId:
if hasFullOrgAccess.get(orgId) or contractId in accessByOrg[orgId]["contracts"]:
filteredRecords.append(record)
else:
filteredRecords.append(record)
elif modelClass == TrusteeContract:
# Can read contracts in their organisation
if contractId:
if hasFullOrgAccess.get(orgId) or contractId in accessByOrg[orgId]["contracts"]:
filteredRecords.append(record)
else:
# For contracts table, check if record's id is in accessible contracts
recordContractId = record.get("id")
if hasFullOrgAccess.get(orgId) or recordContractId in accessByOrg[orgId]["contracts"]:
filteredRecords.append(record)
continue
return filteredRecords