# Copyright (c) 2025 Patrick Motsch # All rights reserved. """ Integration tests for RBAC database filtering. Tests that database queries correctly filter records based on RBAC rules. Uses real database connection for integration testing. """ import pytest from modules.connectors.connectorDbPostgre import DatabaseConnector from modules.datamodels.datamodelUam import User, AccessLevel, UserPermissions from modules.shared.configuration import APP_CONFIG @pytest.fixture(scope="class") def db(): """Create real database connector for integration tests.""" dbHost = APP_CONFIG.get("DB_HOST", "localhost") dbDatabase = APP_CONFIG.get("DB_DATABASE", "poweron_test") dbUser = APP_CONFIG.get("DB_USER", "postgres") dbPassword = APP_CONFIG.get("DB_PASSWORD", "") dbPort = APP_CONFIG.get("DB_PORT", 5432) db = DatabaseConnector( dbHost=dbHost, dbDatabase=dbDatabase, dbUser=dbUser, dbPassword=dbPassword, dbPort=dbPort ) yield db db.close() class TestRbacDatabaseFiltering: """Test RBAC database filtering.""" def testBuildRbacWhereClauseAllAccess(self, db): """Test WHERE clause building for ALL access level.""" permissions = UserPermissions( view=True, read=AccessLevel.ALL, create=AccessLevel.ALL, update=AccessLevel.ALL, delete=AccessLevel.ALL ) user = User( id="test_user_all", username="testuser", roleLabels=["sysadmin"], ) whereClause = db.buildRbacWhereClause(permissions, user, "SomeTable") # ALL access should return None (no filtering) assert whereClause is None def testBuildRbacWhereClauseMyAccess(self, db): """Test WHERE clause building for MY access level.""" permissions = UserPermissions( view=True, read=AccessLevel.MY, create=AccessLevel.MY, update=AccessLevel.MY, delete=AccessLevel.MY ) user = User( id="test_user_my", username="testuser", roleLabels=["user"], ) whereClause = db.buildRbacWhereClause(permissions, user, "SomeTable") assert whereClause is not None assert whereClause["condition"] == '"sysCreatedBy" = %s' assert whereClause["values"] == ["test_user_my"] def testBuildRbacWhereClauseGroupAccess(self, db): """Test WHERE clause building for GROUP access level.""" permissions = UserPermissions( view=True, read=AccessLevel.GROUP, create=AccessLevel.GROUP, update=AccessLevel.GROUP, delete=AccessLevel.GROUP ) mandate_id = "test_mandate_group" user = User( id="test_user_group", username="testuser", roleLabels=["admin"], ) whereClause = db.buildRbacWhereClause( permissions, user, "SomeTable", mandateId=mandate_id ) assert whereClause is not None assert whereClause["condition"] == '("mandateId" = %s OR "mandateId" IS NULL)' assert whereClause["values"] == ["test_mandate_group"] def testBuildRbacWhereClauseNoAccess(self, db): """Test WHERE clause building for NONE access level.""" permissions = UserPermissions( view=True, read=AccessLevel.NONE, create=AccessLevel.NONE, update=AccessLevel.NONE, delete=AccessLevel.NONE ) user = User( id="test_user_none", username="testuser", roleLabels=["viewer"], ) whereClause = db.buildRbacWhereClause(permissions, user, "SomeTable") assert whereClause is not None assert whereClause["condition"] == "1 = 0" # Always false assert whereClause["values"] == [] def testBuildRbacWhereClauseUserInDBTable(self, db): """Test WHERE clause building for UserInDB table with MY access.""" permissions = UserPermissions( view=True, read=AccessLevel.MY, create=AccessLevel.MY, update=AccessLevel.MY, delete=AccessLevel.MY ) user = User( id="test_user_in_db", username="testuser", roleLabels=["user"], ) whereClause = db.buildRbacWhereClause(permissions, user, "UserInDB") # UserInDB with MY access should filter by id field assert whereClause is not None assert whereClause["condition"] == '"id" = %s' assert whereClause["values"] == ["test_user_in_db"] def testBuildRbacWhereClauseUserConnectionTable(self, db): """GROUP on UserConnection resolves member userIds via UserMandate (multi-tenant).""" from modules.datamodels.datamodelUam import UserInDB, Mandate from modules.datamodels.datamodelMembership import UserMandate testMandateId = "rbac_test_mandate_uc" user1Id = "rbac_test_user_uc1" user2Id = "rbac_test_user_uc2" userMandateIds = [] try: mandate = Mandate( id=testMandateId, name="rbac-test-mandate-uc", label="RBAC test", ) mandatePayload = mandate.model_dump() mandatePayload["id"] = mandate.id db.recordCreate(Mandate, mandatePayload) for uid, uname in ( (user1Id, "rbac_uc_user1"), (user2Id, "rbac_uc_user2"), ): u = UserInDB( id=uid, username=uname, email=f"{uid}@example.com", hashedPassword="not-used", ) payload = u.model_dump() payload["id"] = u.id db.recordCreate(UserInDB, payload) for uid in (user1Id, user2Id): um = UserMandate(userId=uid, mandateId=testMandateId, enabled=True) umPayload = um.model_dump() umPayload["id"] = um.id createdUm = db.recordCreate(UserMandate, umPayload) if createdUm and createdUm.get("id"): userMandateIds.append(createdUm["id"]) else: userMandateIds.append(um.id) permissions = UserPermissions( view=True, read=AccessLevel.GROUP, create=AccessLevel.GROUP, update=AccessLevel.GROUP, delete=AccessLevel.GROUP, ) user = User( id=user1Id, username="rbac_uc_user1", roleLabels=["admin"], ) whereClause = db.buildRbacWhereClause( permissions, user, "UserConnection", mandateId=testMandateId ) assert whereClause is not None assert whereClause["condition"] != "1 = 0" assert "userId" in whereClause["condition"] assert "IN" in whereClause["condition"] assert set(whereClause["values"]) == {user1Id, user2Id} finally: for umId in userMandateIds: try: db.recordDelete(UserMandate, umId) except Exception: pass for uid in (user1Id, user2Id): try: db.recordDelete(UserInDB, uid) except Exception: pass try: db.recordDelete(Mandate, testMandateId) except Exception: pass