diff --git a/modules/features/dynamicOptions/mainDynamicOptions.py b/modules/features/dynamicOptions/mainDynamicOptions.py index 364fa9da..b75e906b 100644 --- a/modules/features/dynamicOptions/mainDynamicOptions.py +++ b/modules/features/dynamicOptions/mainDynamicOptions.py @@ -53,7 +53,9 @@ def getOptions(optionsName: str, services, currentUser: Optional[User] = None) - Raises: ValueError: If optionsName is not recognized """ + logger.debug(f"getOptions called with optionsName='{optionsName}' (repr: {repr(optionsName)})") optionsNameLower = optionsName.lower() + logger.debug(f"optionsNameLower='{optionsNameLower}'") if optionsNameLower == "user.role": # Fetch roles from database @@ -118,7 +120,100 @@ def getOptions(optionsName: str, services, currentUser: Optional[User] = None) - logger.error(f"Error fetching user connections for options: {e}") return [] + elif optionsNameLower in ("user", "users"): + # Dynamic options: Get all users for the current mandate + if not currentUser: + return [] + + try: + users = services.interfaceDbApp.getUsersByMandate(currentUser.mandateId) + + # Handle both list and PaginatedResult + if hasattr(users, 'items'): + userList = users.items + else: + userList = users + + return [ + { + "value": user.id, + "label": user.fullName or user.username or user.email or user.id + } + for user in userList + ] + except Exception as e: + logger.error(f"Error fetching users for options: {e}") + return [] + + elif optionsNameLower in ("trusteeorganisation", "trustee.organisation"): + # Dynamic options: Get all trustee organisations + if not currentUser: + return [] + + try: + result = services.interfaceDbTrustee.getAllOrganisations() + + # Handle PaginatedResult + items = result.items if hasattr(result, 'items') else result + + return [ + { + "value": org.get("id") if isinstance(org, dict) else org.id, + "label": org.get("label") if isinstance(org, dict) else org.label + } + for org in items + ] + except Exception as e: + logger.error(f"Error fetching trustee organisations for options: {e}") + return [] + + elif optionsNameLower in ("trusteerole", "trustee.role"): + # Dynamic options: Get all trustee roles + if not currentUser: + return [] + + try: + result = services.interfaceDbTrustee.getAllRoles() + + # Handle PaginatedResult + items = result.items if hasattr(result, 'items') else result + + return [ + { + "value": role.get("id") if isinstance(role, dict) else role.id, + # TrusteeRole uses 'desc' field, not 'label' + "label": role.get("desc", role.get("id")) if isinstance(role, dict) else getattr(role, "desc", role.id) + } + for role in items + ] + except Exception as e: + logger.error(f"Error fetching trustee roles for options: {e}") + return [] + + elif optionsNameLower in ("trusteecontract", "trustee.contract"): + # Dynamic options: Get all trustee contracts + if not currentUser: + return [] + + try: + result = services.interfaceDbTrustee.getAllContracts() + + # Handle PaginatedResult + items = result.items if hasattr(result, 'items') else result + + return [ + { + "value": contract.get("id") if isinstance(contract, dict) else contract.id, + "label": contract.get("label") if isinstance(contract, dict) else (contract.get("name") if isinstance(contract, dict) else getattr(contract, "label", getattr(contract, "name", contract.id))) + } + for contract in items + ] + except Exception as e: + logger.error(f"Error fetching trustee contracts for options: {e}") + return [] + else: + logger.error(f"Unknown options name: '{optionsName}' (lower: '{optionsNameLower}')") raise ValueError(f"Unknown options name: {optionsName}") @@ -134,5 +229,9 @@ def getAvailableOptionsNames() -> List[str]: "auth.authority", "connection.status", "user.connection", + "User", + "TrusteeOrganisation", + "TrusteeRole", + "TrusteeContract", ] diff --git a/modules/interfaces/interfaceDbTrusteeObjects.py b/modules/interfaces/interfaceDbTrusteeObjects.py index 2cfa517e..54fc25c7 100644 --- a/modules/interfaces/interfaceDbTrusteeObjects.py +++ b/modules/interfaces/interfaceDbTrusteeObjects.py @@ -151,6 +151,27 @@ class TrusteeObjects: return True + def getRbacAccessLevel(self, modelClass: type, operation: str) -> AccessLevel: + """Get the RBAC access level for a specific operation on a table. + + Returns: + AccessLevel (ALL, GROUP, MY, NONE) - determines what filtering is needed + """ + if not self.rbac or not self.currentUser: + return AccessLevel.NONE + + tableName = modelClass.__name__ + permissions = self.rbac.getUserPermissions( + self.currentUser, + AccessRuleContext.DATA, + tableName + ) + + if not permissions.view: + return AccessLevel.NONE + + return getattr(permissions, operation, AccessLevel.NONE) + # ===== Organisation CRUD ===== def createOrganisation(self, data: Dict[str, Any]) -> Optional[Dict[str, Any]]: @@ -173,17 +194,29 @@ class TrusteeObjects: logger.error(f"Invalid organisation ID format: {orgId}") return None - success = self.db.saveRecord(TrusteeOrganisation, orgId, data) - if success: - return self.db.getRecord(TrusteeOrganisation, orgId) + createdRecord = self.db.recordCreate(TrusteeOrganisation, data) + if createdRecord and createdRecord.get("id"): + return createdRecord return None def getOrganisation(self, orgId: str) -> Optional[Dict[str, Any]]: """Get a single organisation by ID.""" - return self.db.getRecord(TrusteeOrganisation, orgId) + records = self.db.getRecordset(TrusteeOrganisation, recordFilter={"id": orgId}) + return records[0] if records else None def getAllOrganisations(self, params: Optional[PaginationParams] = None) -> PaginatedResult: - """Get all organisations with RBAC filtering.""" + """Get all organisations with RBAC filtering. + + Note: Organisations are managed at system level (by mandate). + Feature-level filtering (trustee.access) is NOT applied here because: + - Admin users with system RBAC can manage all orgs in their mandate + - trustee.access grants access to specific orgs for other users + - New organisations wouldn't be visible without an access record + """ + # Debug: Log user info and permissions + logger.debug(f"getAllOrganisations called for user {self.userId}, roles: {self.currentUser.roleLabels if self.currentUser else 'None'}, mandateId: {self.mandateId}") + + # System RBAC filtering (filters by mandate for GROUP access level) records = getRecordsetWithRBAC( connector=self.db, modelClass=TrusteeOrganisation, @@ -191,6 +224,7 @@ class TrusteeObjects: recordFilter=None, orderBy="id" ) + logger.debug(f"getAllOrganisations: getRecordsetWithRBAC returned {len(records)} records") # Apply pagination totalItems = len(records) @@ -225,10 +259,8 @@ class TrusteeObjects: return None data["id"] = orgId - success = self.db.saveRecord(TrusteeOrganisation, orgId, data) - if success: - return self.db.getRecord(TrusteeOrganisation, orgId) - return None + updatedRecord = self.db.recordModify(TrusteeOrganisation, orgId, data) + return updatedRecord def deleteOrganisation(self, orgId: str) -> bool: """Delete an organisation.""" @@ -236,7 +268,7 @@ class TrusteeObjects: logger.warning(f"User {self.userId} lacks permission to delete organisation") return False - return self.db.deleteRecord(TrusteeOrganisation, orgId) + return self.db.recordDelete(TrusteeOrganisation, orgId) # ===== Role CRUD ===== @@ -253,17 +285,23 @@ class TrusteeObjects: logger.error("Role ID is required") return None - success = self.db.saveRecord(TrusteeRole, roleId, data) - if success: - return self.db.getRecord(TrusteeRole, roleId) + createdRecord = self.db.recordCreate(TrusteeRole, data) + if createdRecord and createdRecord.get("id"): + return createdRecord return None def getRole(self, roleId: str) -> Optional[Dict[str, Any]]: """Get a single role by ID.""" - return self.db.getRecord(TrusteeRole, roleId) + records = self.db.getRecordset(TrusteeRole, recordFilter={"id": roleId}) + return records[0] if records else None def getAllRoles(self, params: Optional[PaginationParams] = None) -> PaginatedResult: - """Get all roles with RBAC filtering.""" + """Get all roles with RBAC filtering. + + Note: Roles are available to all users with trustee access. + They are not filtered by organisation since they define the role types. + Users with ALL access level see all roles; others need trustee.access records. + """ records = getRecordsetWithRBAC( connector=self.db, modelClass=TrusteeRole, @@ -272,6 +310,14 @@ class TrusteeObjects: orderBy="id" ) + # Users with ALL access level (from system RBAC) see all roles + # Others need at least one trustee.access record + accessLevel = self.getRbacAccessLevel(TrusteeRole, "read") + if accessLevel != AccessLevel.ALL: + userAccess = self.getAllUserAccess(self.userId) + if not userAccess: + records = [] # No trustee access at all + totalItems = len(records) if params: pageSize = params.pageSize or 20 @@ -299,10 +345,8 @@ class TrusteeObjects: return None data["id"] = roleId - success = self.db.saveRecord(TrusteeRole, roleId, data) - if success: - return self.db.getRecord(TrusteeRole, roleId) - return None + updatedRecord = self.db.recordModify(TrusteeRole, roleId, data) + return updatedRecord def deleteRole(self, roleId: str) -> bool: """Delete a role (sysadmin only, not if in use).""" @@ -311,19 +355,30 @@ class TrusteeObjects: return False # Check if role is in use - accessRecords = self.db.getRecordset(TrusteeAccess, {"roleId": roleId}) + accessRecords = self.db.getRecordset(TrusteeAccess, recordFilter={"roleId": roleId}) if accessRecords: logger.error(f"Cannot delete role {roleId}: still in use") return False - return self.db.deleteRecord(TrusteeRole, roleId) + return self.db.recordDelete(TrusteeRole, roleId) # ===== Access CRUD ===== def createAccess(self, data: Dict[str, Any]) -> Optional[Dict[str, Any]]: - """Create a new access record.""" + """Create a new access record. Requires admin role for the organisation or ALL access level.""" + # Check system RBAC first if not self.checkRbacPermission(TrusteeAccess, "create"): - logger.warning(f"User {self.userId} lacks permission to create access") + logger.warning(f"User {self.userId} lacks system permission to create access") + return None + + organisationId = data.get("organisationId") + + # Users with ALL access level bypass feature-level permission check + accessLevel = self.getRbacAccessLevel(TrusteeAccess, "create") + + # Check feature-level permission - must have admin role for this organisation (unless ALL access) + if accessLevel != AccessLevel.ALL and not self.checkUserTrusteePermission(self.userId, organisationId, "admin"): + logger.warning(f"User {self.userId} lacks admin role for organisation {organisationId}") return None data["mandateId"] = self.mandateId @@ -332,17 +387,23 @@ class TrusteeObjects: accessId = data.get("id") or str(uuid.uuid4()) data["id"] = accessId - success = self.db.saveRecord(TrusteeAccess, accessId, data) - if success: - return self.db.getRecord(TrusteeAccess, accessId) + createdRecord = self.db.recordCreate(TrusteeAccess, data) + if createdRecord and createdRecord.get("id"): + return createdRecord return None def getAccess(self, accessId: str) -> Optional[Dict[str, Any]]: """Get a single access record by ID.""" - return self.db.getRecord(TrusteeAccess, accessId) + records = self.db.getRecordset(TrusteeAccess, recordFilter={"id": accessId}) + return records[0] if records else None def getAllAccess(self, params: Optional[PaginationParams] = None) -> PaginatedResult: - """Get all access records with RBAC filtering.""" + """Get all access records with RBAC filtering + feature-level filtering. + + Users with ALL access level see all access records. + Others can only see access records for organisations they have admin access to. + """ + # Step 1: System RBAC filtering records = getRecordsetWithRBAC( connector=self.db, modelClass=TrusteeAccess, @@ -351,6 +412,25 @@ class TrusteeObjects: orderBy="id" ) + # Users with ALL access level (from system RBAC) see all records + accessLevel = self.getRbacAccessLevel(TrusteeAccess, "read") + + if accessLevel != AccessLevel.ALL: + # Step 2: Feature-level filtering - only see access for organisations where user is admin + userAccess = self.getAllUserAccess(self.userId) + + # Get organisations where user has admin role + adminOrgs = set() + for access in userAccess: + if access.get("roleId") == "admin": + adminOrgs.add(access.get("organisationId")) + + # Filter records to only show those in admin organisations + if adminOrgs: + records = [r for r in records if r.get("organisationId") in adminOrgs] + else: + records = [] + totalItems = len(records) if params: pageSize = params.pageSize or 20 @@ -372,7 +452,15 @@ class TrusteeObjects: ) def getAccessByOrganisation(self, organisationId: str) -> List[Dict[str, Any]]: - """Get all access records for a specific organisation.""" + """Get all access records for a specific organisation. + + Requires admin role for the organisation. + """ + # Check if user has admin access for this organisation + if not self.checkUserTrusteePermission(self.userId, organisationId, "admin"): + logger.warning(f"User {self.userId} lacks admin role for organisation {organisationId}") + return [] + return getRecordsetWithRBAC( connector=self.db, modelClass=TrusteeAccess, @@ -382,41 +470,98 @@ class TrusteeObjects: ) def getAccessByUser(self, userId: str) -> List[Dict[str, Any]]: - """Get all access records for a specific user.""" - return getRecordsetWithRBAC( + """Get all access records for a specific user. + + Users with ALL access level see all access records. + Others can only see access records for organisations where they have admin role. + """ + records = getRecordsetWithRBAC( connector=self.db, modelClass=TrusteeAccess, currentUser=self.currentUser, recordFilter={"userId": userId}, orderBy="id" ) + + # Users with ALL access level (from system RBAC) see all records + accessLevel = self.getRbacAccessLevel(TrusteeAccess, "read") + if accessLevel == AccessLevel.ALL: + return records + + # Filter to only organisations where current user has admin role + userAccess = self.getAllUserAccess(self.userId) + adminOrgs = set() + for access in userAccess: + if access.get("roleId") == "admin": + adminOrgs.add(access.get("organisationId")) + + return [r for r in records if r.get("organisationId") in adminOrgs] def updateAccess(self, accessId: str, data: Dict[str, Any]) -> Optional[Dict[str, Any]]: - """Update an access record.""" + """Update an access record. Requires admin role for the organisation or ALL access level.""" + # Check system RBAC first if not self.checkRbacPermission(TrusteeAccess, "update"): - logger.warning(f"User {self.userId} lacks permission to update access") + logger.warning(f"User {self.userId} lacks system permission to update access") + return None + + # Get existing access to check organisation + existingRecords = self.db.getRecordset(TrusteeAccess, recordFilter={"id": accessId}) + existing = existingRecords[0] if existingRecords else None + + if not existing: + logger.warning(f"Access record {accessId} not found") + return None + + organisationId = existing.get("organisationId") + + # Users with ALL access level bypass feature-level permission check + accessLevel = self.getRbacAccessLevel(TrusteeAccess, "update") + + # Check feature-level permission - must have admin role for this organisation (unless ALL access) + if accessLevel != AccessLevel.ALL and not self.checkUserTrusteePermission(self.userId, organisationId, "admin"): + logger.warning(f"User {self.userId} lacks admin role for organisation {organisationId}") return None data["id"] = accessId - success = self.db.saveRecord(TrusteeAccess, accessId, data) - if success: - return self.db.getRecord(TrusteeAccess, accessId) - return None + updatedRecord = self.db.recordModify(TrusteeAccess, accessId, data) + return updatedRecord def deleteAccess(self, accessId: str) -> bool: - """Delete an access record.""" + """Delete an access record. Requires admin role for the organisation or ALL access level.""" + # Check system RBAC first if not self.checkRbacPermission(TrusteeAccess, "delete"): - logger.warning(f"User {self.userId} lacks permission to delete access") + logger.warning(f"User {self.userId} lacks system permission to delete access") + return False + + # Get existing access to check organisation + existingRecords = self.db.getRecordset(TrusteeAccess, recordFilter={"id": accessId}) + existing = existingRecords[0] if existingRecords else None + + if not existing: + logger.warning(f"Access record {accessId} not found") + return False + + organisationId = existing.get("organisationId") + + # Users with ALL access level bypass feature-level permission check + accessLevel = self.getRbacAccessLevel(TrusteeAccess, "delete") + + # Check feature-level permission - must have admin role for this organisation (unless ALL access) + if accessLevel != AccessLevel.ALL and not self.checkUserTrusteePermission(self.userId, organisationId, "admin"): + logger.warning(f"User {self.userId} lacks admin role for organisation {organisationId}") return False - return self.db.deleteRecord(TrusteeAccess, accessId) + return self.db.recordDelete(TrusteeAccess, accessId) # ===== Contract CRUD ===== def createContract(self, data: Dict[str, Any]) -> Optional[Dict[str, Any]]: """Create a new contract.""" - if not self.checkRbacPermission(TrusteeContract, "create"): - logger.warning(f"User {self.userId} lacks permission to create contract") + organisationId = data.get("organisationId") + + # Check combined permission (system RBAC + feature-level) + if not self.checkCombinedPermission(TrusteeContract, "create", organisationId): + logger.warning(f"User {self.userId} lacks permission to create contract for org {organisationId}") return None data["mandateId"] = self.mandateId @@ -425,17 +570,19 @@ class TrusteeObjects: contractId = data.get("id") or str(uuid.uuid4()) data["id"] = contractId - success = self.db.saveRecord(TrusteeContract, contractId, data) - if success: - return self.db.getRecord(TrusteeContract, contractId) + createdRecord = self.db.recordCreate(TrusteeContract, data) + if createdRecord and createdRecord.get("id"): + return createdRecord return None def getContract(self, contractId: str) -> Optional[Dict[str, Any]]: """Get a single contract by ID.""" - return self.db.getRecord(TrusteeContract, contractId) + records = self.db.getRecordset(TrusteeContract, recordFilter={"id": contractId}) + return records[0] if records else None def getAllContracts(self, params: Optional[PaginationParams] = None) -> PaginatedResult: - """Get all contracts with RBAC filtering.""" + """Get all contracts with RBAC filtering + feature-level access filtering.""" + # Step 1: System RBAC filtering records = getRecordsetWithRBAC( connector=self.db, modelClass=TrusteeContract, @@ -444,6 +591,9 @@ class TrusteeObjects: orderBy="id" ) + # Step 2: Feature-level filtering based on trustee.access + records = self.filterRecordsByTrusteeAccess(records, TrusteeContract) + totalItems = len(records) if params: pageSize = params.pageSize or 20 @@ -466,47 +616,73 @@ class TrusteeObjects: def getContractsByOrganisation(self, organisationId: str) -> List[Dict[str, Any]]: """Get all contracts for a specific organisation.""" - return getRecordsetWithRBAC( + # Step 1: System RBAC filtering + records = getRecordsetWithRBAC( connector=self.db, modelClass=TrusteeContract, currentUser=self.currentUser, recordFilter={"organisationId": organisationId}, orderBy="label" ) + + # Step 2: Feature-level filtering based on trustee.access + return self.filterRecordsByTrusteeAccess(records, TrusteeContract) def updateContract(self, contractId: str, data: Dict[str, Any]) -> Optional[Dict[str, Any]]: """Update a contract (organisationId is immutable).""" - if not self.checkRbacPermission(TrusteeContract, "update"): - logger.warning(f"User {self.userId} lacks permission to update contract") + # Get existing contract to check organisation + existingRecords = self.db.getRecordset(TrusteeContract, recordFilter={"id": contractId}) + existing = existingRecords[0] if existingRecords else None + + if not existing: + logger.warning(f"Contract {contractId} not found") + return None + + organisationId = existing.get("organisationId") + + # Check combined permission (system RBAC + feature-level) + if not self.checkCombinedPermission(TrusteeContract, "update", organisationId): + logger.warning(f"User {self.userId} lacks permission to update contract in org {organisationId}") return None - # Check if organisationId is being changed - existing = self.db.getRecord(TrusteeContract, contractId) - if existing and "organisationId" in data: - if data["organisationId"] != existing.get("organisationId"): - logger.error("Contract organisationId cannot be changed after creation") - return None + # Check if organisationId is being changed (not allowed) + if "organisationId" in data and data["organisationId"] != organisationId: + logger.error("Contract organisationId cannot be changed after creation") + return None data["id"] = contractId - success = self.db.saveRecord(TrusteeContract, contractId, data) - if success: - return self.db.getRecord(TrusteeContract, contractId) - return None + updatedRecord = self.db.recordModify(TrusteeContract, contractId, data) + return updatedRecord def deleteContract(self, contractId: str) -> bool: """Delete a contract.""" - if not self.checkRbacPermission(TrusteeContract, "delete"): - logger.warning(f"User {self.userId} lacks permission to delete contract") + # Get existing contract to check organisation + existingRecords = self.db.getRecordset(TrusteeContract, recordFilter={"id": contractId}) + existing = existingRecords[0] if existingRecords else None + + if not existing: + logger.warning(f"Contract {contractId} not found") + return False + + organisationId = existing.get("organisationId") + + # Check combined permission (system RBAC + feature-level) + if not self.checkCombinedPermission(TrusteeContract, "delete", organisationId): + logger.warning(f"User {self.userId} lacks permission to delete contract in org {organisationId}") return False - return self.db.deleteRecord(TrusteeContract, contractId) + return self.db.recordDelete(TrusteeContract, contractId) # ===== Document CRUD ===== def createDocument(self, data: Dict[str, Any]) -> Optional[Dict[str, Any]]: """Create a new document.""" - if not self.checkRbacPermission(TrusteeDocument, "create"): - logger.warning(f"User {self.userId} lacks permission to create document") + organisationId = data.get("organisationId") + contractId = data.get("contractId") + + # Check combined permission (system RBAC + feature-level) + if not self.checkCombinedPermission(TrusteeDocument, "create", organisationId, contractId): + logger.warning(f"User {self.userId} lacks permission to create document in org {organisationId}") return None data["mandateId"] = self.mandateId @@ -515,28 +691,34 @@ class TrusteeObjects: documentId = data.get("id") or str(uuid.uuid4()) data["id"] = documentId - success = self.db.saveRecord(TrusteeDocument, documentId, data) - if success: - return self.db.getRecord(TrusteeDocument, documentId) + createdRecord = self.db.recordCreate(TrusteeDocument, data) + if createdRecord and createdRecord.get("id"): + # Remove binary data from response + createdRecord.pop("documentData", None) + return createdRecord return None def getDocument(self, documentId: str) -> Optional[Dict[str, Any]]: """Get a single document by ID (metadata only).""" - record = self.db.getRecord(TrusteeDocument, documentId) - if record: + records = self.db.getRecordset(TrusteeDocument, recordFilter={"id": documentId}) + if records: + record = records[0] # Remove binary data from response record.pop("documentData", None) - return record + return record + return None def getDocumentData(self, documentId: str) -> Optional[bytes]: """Get document binary data.""" - record = self.db.getRecord(TrusteeDocument, documentId) + records = self.db.getRecordset(TrusteeDocument, recordFilter={"id": documentId}) + record = records[0] if records else None if record: return record.get("documentData") return None def getAllDocuments(self, params: Optional[PaginationParams] = None) -> PaginatedResult: - """Get all documents with RBAC filtering (metadata only).""" + """Get all documents with RBAC filtering + feature-level access filtering (metadata only).""" + # Step 1: System RBAC filtering records = getRecordsetWithRBAC( connector=self.db, modelClass=TrusteeDocument, @@ -545,6 +727,10 @@ class TrusteeObjects: orderBy="documentName" ) + # Step 2: Feature-level filtering based on trustee.access + # This applies userreport filtering (only own records) + records = self.filterRecordsByTrusteeAccess(records, TrusteeDocument) + # Remove binary data from responses for record in records: record.pop("documentData", None) @@ -571,6 +757,7 @@ class TrusteeObjects: def getDocumentsByContract(self, contractId: str) -> List[Dict[str, Any]]: """Get all documents for a specific contract.""" + # Step 1: System RBAC filtering records = getRecordsetWithRBAC( connector=self.db, modelClass=TrusteeDocument, @@ -578,39 +765,72 @@ class TrusteeObjects: recordFilter={"contractId": contractId}, orderBy="documentName" ) + + # Step 2: Feature-level filtering based on trustee.access + records = self.filterRecordsByTrusteeAccess(records, TrusteeDocument) + for record in records: record.pop("documentData", None) return records def updateDocument(self, documentId: str, data: Dict[str, Any]) -> Optional[Dict[str, Any]]: """Update a document.""" - if not self.checkRbacPermission(TrusteeDocument, "update"): - logger.warning(f"User {self.userId} lacks permission to update document") + # Get existing document to check organisation and creator + existingRecords = self.db.getRecordset(TrusteeDocument, recordFilter={"id": documentId}) + existing = existingRecords[0] if existingRecords else None + + if not existing: + logger.warning(f"Document {documentId} not found") + return None + + organisationId = existing.get("organisationId") + contractId = existing.get("contractId") + createdBy = existing.get("_createdBy") + + # Check combined permission (system RBAC + feature-level) + # For userreport, this checks if they created the record + if not self.checkCombinedPermission(TrusteeDocument, "update", organisationId, contractId, createdBy): + logger.warning(f"User {self.userId} lacks permission to update document in org {organisationId}") return None data["id"] = documentId - success = self.db.saveRecord(TrusteeDocument, documentId, data) - if success: - result = self.db.getRecord(TrusteeDocument, documentId) - if result: - result.pop("documentData", None) - return result - return None + updatedRecord = self.db.recordModify(TrusteeDocument, documentId, data) + if updatedRecord: + updatedRecord.pop("documentData", None) + return updatedRecord def deleteDocument(self, documentId: str) -> bool: """Delete a document.""" - if not self.checkRbacPermission(TrusteeDocument, "delete"): - logger.warning(f"User {self.userId} lacks permission to delete document") + # Get existing document to check organisation and creator + existingRecords = self.db.getRecordset(TrusteeDocument, recordFilter={"id": documentId}) + existing = existingRecords[0] if existingRecords else None + + if not existing: + logger.warning(f"Document {documentId} not found") + return False + + organisationId = existing.get("organisationId") + contractId = existing.get("contractId") + createdBy = existing.get("_createdBy") + + # Check combined permission (system RBAC + feature-level) + # For userreport, this checks if they created the record + if not self.checkCombinedPermission(TrusteeDocument, "delete", organisationId, contractId, createdBy): + logger.warning(f"User {self.userId} lacks permission to delete document in org {organisationId}") return False - return self.db.deleteRecord(TrusteeDocument, documentId) + return self.db.recordDelete(TrusteeDocument, documentId) # ===== Position CRUD ===== def createPosition(self, data: Dict[str, Any]) -> Optional[Dict[str, Any]]: """Create a new position.""" - if not self.checkRbacPermission(TrusteePosition, "create"): - logger.warning(f"User {self.userId} lacks permission to create position") + organisationId = data.get("organisationId") + contractId = data.get("contractId") + + # Check combined permission (system RBAC + feature-level) + if not self.checkCombinedPermission(TrusteePosition, "create", organisationId, contractId): + logger.warning(f"User {self.userId} lacks permission to create position in org {organisationId}") return None data["mandateId"] = self.mandateId @@ -625,17 +845,19 @@ class TrusteeObjects: positionId = data.get("id") or str(uuid.uuid4()) data["id"] = positionId - success = self.db.saveRecord(TrusteePosition, positionId, data) - if success: - return self.db.getRecord(TrusteePosition, positionId) + createdRecord = self.db.recordCreate(TrusteePosition, data) + if createdRecord and createdRecord.get("id"): + return createdRecord return None def getPosition(self, positionId: str) -> Optional[Dict[str, Any]]: """Get a single position by ID.""" - return self.db.getRecord(TrusteePosition, positionId) + records = self.db.getRecordset(TrusteePosition, recordFilter={"id": positionId}) + return records[0] if records else None def getAllPositions(self, params: Optional[PaginationParams] = None) -> PaginatedResult: - """Get all positions with RBAC filtering.""" + """Get all positions with RBAC filtering + feature-level access filtering.""" + # Step 1: System RBAC filtering records = getRecordsetWithRBAC( connector=self.db, modelClass=TrusteePosition, @@ -644,6 +866,10 @@ class TrusteeObjects: orderBy="valuta" ) + # Step 2: Feature-level filtering based on trustee.access + # This applies userreport filtering (only own records) + records = self.filterRecordsByTrusteeAccess(records, TrusteePosition) + totalItems = len(records) if params: pageSize = params.pageSize or 20 @@ -666,50 +892,88 @@ class TrusteeObjects: def getPositionsByContract(self, contractId: str) -> List[Dict[str, Any]]: """Get all positions for a specific contract.""" - return getRecordsetWithRBAC( + # Step 1: System RBAC filtering + records = getRecordsetWithRBAC( connector=self.db, modelClass=TrusteePosition, currentUser=self.currentUser, recordFilter={"contractId": contractId}, orderBy="valuta" ) + + # Step 2: Feature-level filtering based on trustee.access + return self.filterRecordsByTrusteeAccess(records, TrusteePosition) def getPositionsByOrganisation(self, organisationId: str) -> List[Dict[str, Any]]: """Get all positions for a specific organisation.""" - return getRecordsetWithRBAC( + # Step 1: System RBAC filtering + records = getRecordsetWithRBAC( connector=self.db, modelClass=TrusteePosition, currentUser=self.currentUser, recordFilter={"organisationId": organisationId}, orderBy="valuta" ) + + # Step 2: Feature-level filtering based on trustee.access + return self.filterRecordsByTrusteeAccess(records, TrusteePosition) def updatePosition(self, positionId: str, data: Dict[str, Any]) -> Optional[Dict[str, Any]]: """Update a position.""" - if not self.checkRbacPermission(TrusteePosition, "update"): - logger.warning(f"User {self.userId} lacks permission to update position") + # Get existing position to check organisation and creator + existingRecords = self.db.getRecordset(TrusteePosition, recordFilter={"id": positionId}) + existing = existingRecords[0] if existingRecords else None + + if not existing: + logger.warning(f"Position {positionId} not found") + return None + + organisationId = existing.get("organisationId") + contractId = existing.get("contractId") + createdBy = existing.get("_createdBy") + + # Check combined permission (system RBAC + feature-level) + # For userreport, this checks if they created the record + if not self.checkCombinedPermission(TrusteePosition, "update", organisationId, contractId, createdBy): + logger.warning(f"User {self.userId} lacks permission to update position in org {organisationId}") return None data["id"] = positionId - success = self.db.saveRecord(TrusteePosition, positionId, data) - if success: - return self.db.getRecord(TrusteePosition, positionId) - return None + updatedRecord = self.db.recordModify(TrusteePosition, positionId, data) + return updatedRecord def deletePosition(self, positionId: str) -> bool: """Delete a position.""" - if not self.checkRbacPermission(TrusteePosition, "delete"): - logger.warning(f"User {self.userId} lacks permission to delete position") + # Get existing position to check organisation and creator + existingRecords = self.db.getRecordset(TrusteePosition, recordFilter={"id": positionId}) + existing = existingRecords[0] if existingRecords else None + + if not existing: + logger.warning(f"Position {positionId} not found") + return False + + organisationId = existing.get("organisationId") + contractId = existing.get("contractId") + createdBy = existing.get("_createdBy") + + # Check combined permission (system RBAC + feature-level) + # For userreport, this checks if they created the record + if not self.checkCombinedPermission(TrusteePosition, "delete", organisationId, contractId, createdBy): + logger.warning(f"User {self.userId} lacks permission to delete position in org {organisationId}") return False - return self.db.deleteRecord(TrusteePosition, positionId) + return self.db.recordDelete(TrusteePosition, positionId) # ===== Position-Document Link CRUD ===== def createPositionDocument(self, data: Dict[str, Any]) -> Optional[Dict[str, Any]]: """Create a new position-document link.""" - if not self.checkRbacPermission(TrusteePositionDocument, "create"): - logger.warning(f"User {self.userId} lacks permission to create position-document link") + organisationId = data.get("organisationId") + contractId = data.get("contractId") + + # Check combined permission (system RBAC + feature-level) + if not self.checkCombinedPermission(TrusteePositionDocument, "create", organisationId, contractId): + logger.warning(f"User {self.userId} lacks permission to create position-document link in org {organisationId}") return None data["mandateId"] = self.mandateId @@ -718,17 +982,19 @@ class TrusteeObjects: linkId = data.get("id") or str(uuid.uuid4()) data["id"] = linkId - success = self.db.saveRecord(TrusteePositionDocument, linkId, data) - if success: - return self.db.getRecord(TrusteePositionDocument, linkId) + createdRecord = self.db.recordCreate(TrusteePositionDocument, data) + if createdRecord and createdRecord.get("id"): + return createdRecord return None def getPositionDocument(self, linkId: str) -> Optional[Dict[str, Any]]: """Get a single position-document link by ID.""" - return self.db.getRecord(TrusteePositionDocument, linkId) + records = self.db.getRecordset(TrusteePositionDocument, recordFilter={"id": linkId}) + return records[0] if records else None def getAllPositionDocuments(self, params: Optional[PaginationParams] = None) -> PaginatedResult: - """Get all position-document links with RBAC filtering.""" + """Get all position-document links with RBAC filtering + feature-level access filtering.""" + # Step 1: System RBAC filtering records = getRecordsetWithRBAC( connector=self.db, modelClass=TrusteePositionDocument, @@ -737,6 +1003,10 @@ class TrusteeObjects: orderBy="id" ) + # Step 2: Feature-level filtering based on trustee.access + # This applies userreport filtering (only own records) + records = self.filterRecordsByTrusteeAccess(records, TrusteePositionDocument) + totalItems = len(records) if params: pageSize = params.pageSize or 20 @@ -759,6 +1029,7 @@ class TrusteeObjects: def getDocumentsForPosition(self, positionId: str) -> List[Dict[str, Any]]: """Get all documents linked to a position.""" + # Step 1: System RBAC filtering links = getRecordsetWithRBAC( connector=self.db, modelClass=TrusteePositionDocument, @@ -766,10 +1037,13 @@ class TrusteeObjects: recordFilter={"positionId": positionId}, orderBy="id" ) - return links + + # Step 2: Feature-level filtering based on trustee.access + return self.filterRecordsByTrusteeAccess(links, TrusteePositionDocument) def getPositionsForDocument(self, documentId: str) -> List[Dict[str, Any]]: """Get all positions linked to a document.""" + # Step 1: System RBAC filtering links = getRecordsetWithRBAC( connector=self.db, modelClass=TrusteePositionDocument, @@ -777,15 +1051,31 @@ class TrusteeObjects: recordFilter={"documentId": documentId}, orderBy="id" ) - return links + + # Step 2: Feature-level filtering based on trustee.access + return self.filterRecordsByTrusteeAccess(links, TrusteePositionDocument) def deletePositionDocument(self, linkId: str) -> bool: """Delete a position-document link.""" - if not self.checkRbacPermission(TrusteePositionDocument, "delete"): - logger.warning(f"User {self.userId} lacks permission to delete position-document link") + # Get existing link to check organisation and creator + existingRecords = self.db.getRecordset(TrusteePositionDocument, recordFilter={"id": linkId}) + existing = existingRecords[0] if existingRecords else None + + if not existing: + logger.warning(f"Position-document link {linkId} not found") + return False + + organisationId = existing.get("organisationId") + contractId = existing.get("contractId") + createdBy = existing.get("_createdBy") + + # Check combined permission (system RBAC + feature-level) + # For userreport, this checks if they created the record + if not self.checkCombinedPermission(TrusteePositionDocument, "delete", organisationId, contractId, createdBy): + logger.warning(f"User {self.userId} lacks permission to delete position-document link in org {organisationId}") return False - return self.db.deleteRecord(TrusteePositionDocument, linkId) + return self.db.recordDelete(TrusteePositionDocument, linkId) # ===== Trustee-specific Access Check ===== @@ -796,6 +1086,47 @@ class TrusteeObjects: {"userId": userId, "organisationId": organisationId} ) + def getAllUserAccess(self, userId: str) -> List[Dict[str, Any]]: + """Get all access records for a user across all organisations.""" + return self.db.getRecordset(TrusteeAccess, {"userId": userId}) + + def getUserTrusteeRoles(self, userId: str, organisationId: str, contractId: Optional[str] = None) -> List[str]: + """ + Get all trustee roles a user has for an organisation (and optionally contract). + + Args: + userId: User ID to check + organisationId: Organisation ID + contractId: Optional contract ID for contract-specific access + + Returns: + List of role IDs the user has + """ + accessRecords = self.getUserAccessForOrganisation(userId, organisationId) + roles = [] + + for access in accessRecords: + accessContractId = access.get("contractId") + roleId = access.get("roleId") + + # If access has no contractId, it grants access to all contracts + if accessContractId is None: + if roleId not in roles: + roles.append(roleId) + # If checking for specific contract, match it + elif contractId and accessContractId == contractId: + if roleId not in roles: + roles.append(roleId) + # If no specific contract requested but access is contract-specific, + # only include if we're not filtering by contract + elif contractId is None: + # User has contract-specific access, but we're not filtering + # They can see the record but only for their specific contracts + if roleId not in roles: + roles.append(roleId) + + return roles + def checkUserTrusteePermission( self, userId: str, @@ -815,22 +1146,188 @@ class TrusteeObjects: Returns: True if user has the required role """ - accessRecords = self.getUserAccessForOrganisation(userId, organisationId) + roles = self.getUserTrusteeRoles(userId, organisationId, contractId) + return requiredRole in roles + + def checkCombinedPermission( + self, + modelClass: type, + operation: str, + organisationId: Optional[str] = None, + contractId: Optional[str] = None, + recordCreatedBy: Optional[str] = None + ) -> bool: + """ + Check combined system RBAC + feature-level RBAC permissions. - for access in accessRecords: - if access.get("roleId") == requiredRole: - accessContractId = access.get("contractId") - - # If access has no contractId, it grants access to all contracts - if accessContractId is None: + Args: + modelClass: The model class (e.g., TrusteeContract) + operation: Operation type (read, create, update, delete) + organisationId: Optional organisation ID for feature-level check + contractId: Optional contract ID for contract-level filtering + recordCreatedBy: Optional creator ID for userreport role checks + + Returns: + True if user has permission + """ + # Step 1: Check system RBAC and get access level + accessLevel = self.getRbacAccessLevel(modelClass, operation) + if accessLevel == AccessLevel.NONE: + return False + + # Users with ALL access level bypass feature-level checks + if accessLevel == AccessLevel.ALL: + return True + + # Step 2: If no organisationId, system RBAC is sufficient (for listing all) + if organisationId is None: + return True + + # Step 3: Check feature-level RBAC via trustee.access + roles = self.getUserTrusteeRoles(self.userId, organisationId, contractId) + + if not roles: + # No trustee access for this organisation + return False + + # Check role-based permissions + # admin role: full CRUD for organisation + if "admin" in roles: + return True + + # operate role: CRUD for contracts, documents, positions + if "operate" in roles: + if modelClass in (TrusteeContract, TrusteeDocument, TrusteePosition, TrusteePositionDocument): + return True + # operate can read organisations + if modelClass == TrusteeOrganisation and operation == "read": + return True + + # userreport role: CRUD own records for documents/positions + if "userreport" in roles: + if modelClass in (TrusteeDocument, TrusteePosition, TrusteePositionDocument): + # For create, always allowed + if operation == "create": return True - - # If checking for specific contract, match it - if contractId and accessContractId == contractId: + # For read/update/delete, must be own record + if recordCreatedBy == self.userId: return True - - # If no specific contract requested and access is contract-specific, deny - if contractId is None: - continue + # userreport can read organisations and contracts + if modelClass in (TrusteeOrganisation, TrusteeContract) and operation == "read": + return True return False + + def filterRecordsByTrusteeAccess( + self, + records: List[Dict[str, Any]], + modelClass: type + ) -> List[Dict[str, Any]]: + """ + Filter records based on user's trustee.access permissions. + + Args: + records: List of records to filter + modelClass: The model class for determining filter logic + + Returns: + Filtered list of records + """ + if not records: + return records + + # Users with ALL access level bypass feature-level filtering + accessLevel = self.getRbacAccessLevel(modelClass, "read") + if accessLevel == AccessLevel.ALL: + return records + + # Get all user's access records + userAccess = self.getAllUserAccess(self.userId) + + if not userAccess: + # No trustee access at all - return empty for trustee tables + return [] + + # Build lookup for user's accessible organisations and contracts + accessByOrg = {} # {orgId: {'roles': [...], 'contracts': [...]}} + hasFullOrgAccess = {} # {orgId: True} if user has access without contractId restriction + + for access in userAccess: + orgId = access.get("organisationId") + roleId = access.get("roleId") + contractIdAccess = access.get("contractId") + + if orgId not in accessByOrg: + accessByOrg[orgId] = {"roles": [], "contracts": []} + + if roleId not in accessByOrg[orgId]["roles"]: + accessByOrg[orgId]["roles"].append(roleId) + + if contractIdAccess is None: + hasFullOrgAccess[orgId] = True + elif contractIdAccess not in accessByOrg[orgId]["contracts"]: + accessByOrg[orgId]["contracts"].append(contractIdAccess) + + filteredRecords = [] + + for record in records: + orgId = record.get("organisationId") + contractId = record.get("contractId") + createdBy = record.get("_createdBy") + + # For Organisation model, filter by accessible organisations + if modelClass == TrusteeOrganisation: + recordOrgId = record.get("id") + if recordOrgId in accessByOrg: + filteredRecords.append(record) + continue + + # Check if user has access to this organisation + if orgId not in accessByOrg: + continue + + roles = accessByOrg[orgId]["roles"] + + # admin has full access to organisation + if "admin" in roles: + # Check contract filtering + if contractId: + if hasFullOrgAccess.get(orgId) or contractId in accessByOrg[orgId]["contracts"]: + filteredRecords.append(record) + else: + filteredRecords.append(record) + continue + + # operate has full access to organisation data + if "operate" in roles: + if contractId: + if hasFullOrgAccess.get(orgId) or contractId in accessByOrg[orgId]["contracts"]: + filteredRecords.append(record) + else: + filteredRecords.append(record) + continue + + # userreport can only see own records for documents/positions + if "userreport" in roles: + if modelClass in (TrusteeDocument, TrusteePosition, TrusteePositionDocument): + # Must be own record + if createdBy == self.userId: + # Also check contract access + if contractId: + if hasFullOrgAccess.get(orgId) or contractId in accessByOrg[orgId]["contracts"]: + filteredRecords.append(record) + else: + filteredRecords.append(record) + elif modelClass == TrusteeContract: + # Can read contracts in their organisation + if contractId: + if hasFullOrgAccess.get(orgId) or contractId in accessByOrg[orgId]["contracts"]: + filteredRecords.append(record) + else: + # For contracts table, check if record's id is in accessible contracts + recordContractId = record.get("id") + if hasFullOrgAccess.get(orgId) or recordContractId in accessByOrg[orgId]["contracts"]: + filteredRecords.append(record) + continue + + return filteredRecords diff --git a/modules/routes/routeDataTrustee.py b/modules/routes/routeDataTrustee.py index f00c3128..bca55df4 100644 --- a/modules/routes/routeDataTrustee.py +++ b/modules/routes/routeDataTrustee.py @@ -70,9 +70,12 @@ async def getOrganisations( currentUser: User = Depends(getCurrentUser) ) -> PaginatedResponse[TrusteeOrganisation]: """Get all organisations with optional pagination.""" + logger = logging.getLogger(__name__) + logger.debug(f"getOrganisations called for user {currentUser.id}, roles: {currentUser.roleLabels}") paginationParams = _parsePagination(pagination) interface = getInterface(currentUser) result = interface.getAllOrganisations(paginationParams) + logger.debug(f"getOrganisations returned {len(result.items)} items") if paginationParams: return PaginatedResponse( diff --git a/modules/routes/routeOptions.py b/modules/routes/routeOptions.py index b4086fd2..a2acad76 100644 --- a/modules/routes/routeOptions.py +++ b/modules/routes/routeOptions.py @@ -47,10 +47,13 @@ async def getOptionsEndpoint( - GET /api/options/connection.status """ try: + logger.debug(f"Options request: {optionsName} for user {currentUser.id}") services = getServices(currentUser, None) options = getOptions(optionsName, services, currentUser) + logger.debug(f"Options response: {optionsName} returned {len(options)} items") return options except ValueError as e: + logger.error(f"ValueError for options {optionsName}: {str(e)}") raise HTTPException( status_code=400, detail=str(e) diff --git a/modules/services/__init__.py b/modules/services/__init__.py index 32e0cb3f..16d2ed6d 100644 --- a/modules/services/__init__.py +++ b/modules/services/__init__.py @@ -57,6 +57,9 @@ class Services: from modules.interfaces.interfaceDbComponentObjects import getInterface as getComponentInterface self.interfaceDbComponent = getComponentInterface(user) + from modules.interfaces.interfaceDbTrusteeObjects import getInterface as getTrusteeInterface + self.interfaceDbTrustee = getTrusteeInterface(user) + # Expose RBAC directly on services for convenience self.rbac = self.interfaceDbApp.rbac if self.interfaceDbApp else None