gateway/tests/integration/rbac/test_rbac_database.py
2025-12-15 21:55:26 +01:00

211 lines
6.8 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Integration tests for RBAC database filtering.
Tests that database queries correctly filter records based on RBAC rules.
Uses real database connection for integration testing.
"""
import pytest
from modules.connectors.connectorDbPostgre import DatabaseConnector
from modules.datamodels.datamodelUam import User, AccessLevel, UserPermissions
from modules.shared.configuration import APP_CONFIG
@pytest.fixture(scope="class")
def db():
"""Create real database connector for integration tests."""
dbHost = APP_CONFIG.get("DB_HOST", "localhost")
dbDatabase = APP_CONFIG.get("DB_DATABASE", "poweron_test")
dbUser = APP_CONFIG.get("DB_USER", "postgres")
dbPassword = APP_CONFIG.get("DB_PASSWORD", "")
dbPort = APP_CONFIG.get("DB_PORT", 5432)
db = DatabaseConnector(
dbHost=dbHost,
dbDatabase=dbDatabase,
dbUser=dbUser,
dbPassword=dbPassword,
dbPort=dbPort
)
yield db
db.close()
class TestRbacDatabaseFiltering:
"""Test RBAC database filtering."""
def testBuildRbacWhereClauseAllAccess(self, db):
"""Test WHERE clause building for ALL access level."""
permissions = UserPermissions(
view=True,
read=AccessLevel.ALL,
create=AccessLevel.ALL,
update=AccessLevel.ALL,
delete=AccessLevel.ALL
)
user = User(
id="test_user_all",
username="testuser",
roleLabels=["sysadmin"],
mandateId="test_mandate_all"
)
whereClause = db.buildRbacWhereClause(permissions, user, "SomeTable")
# ALL access should return None (no filtering)
assert whereClause is None
def testBuildRbacWhereClauseMyAccess(self, db):
"""Test WHERE clause building for MY access level."""
permissions = UserPermissions(
view=True,
read=AccessLevel.MY,
create=AccessLevel.MY,
update=AccessLevel.MY,
delete=AccessLevel.MY
)
user = User(
id="test_user_my",
username="testuser",
roleLabels=["user"],
mandateId="test_mandate_my"
)
whereClause = db.buildRbacWhereClause(permissions, user, "SomeTable")
assert whereClause is not None
assert whereClause["condition"] == '"_createdBy" = %s'
assert whereClause["values"] == ["test_user_my"]
def testBuildRbacWhereClauseGroupAccess(self, db):
"""Test WHERE clause building for GROUP access level."""
permissions = UserPermissions(
view=True,
read=AccessLevel.GROUP,
create=AccessLevel.GROUP,
update=AccessLevel.GROUP,
delete=AccessLevel.GROUP
)
user = User(
id="test_user_group",
username="testuser",
roleLabels=["admin"],
mandateId="test_mandate_group"
)
whereClause = db.buildRbacWhereClause(permissions, user, "SomeTable")
assert whereClause is not None
assert whereClause["condition"] == '"mandateId" = %s'
assert whereClause["values"] == ["test_mandate_group"]
def testBuildRbacWhereClauseNoAccess(self, db):
"""Test WHERE clause building for NONE access level."""
permissions = UserPermissions(
view=True,
read=AccessLevel.NONE,
create=AccessLevel.NONE,
update=AccessLevel.NONE,
delete=AccessLevel.NONE
)
user = User(
id="test_user_none",
username="testuser",
roleLabels=["viewer"],
mandateId="test_mandate_none"
)
whereClause = db.buildRbacWhereClause(permissions, user, "SomeTable")
assert whereClause is not None
assert whereClause["condition"] == "1 = 0" # Always false
assert whereClause["values"] == []
def testBuildRbacWhereClauseUserInDBTable(self, db):
"""Test WHERE clause building for UserInDB table with MY access."""
permissions = UserPermissions(
view=True,
read=AccessLevel.MY,
create=AccessLevel.MY,
update=AccessLevel.MY,
delete=AccessLevel.MY
)
user = User(
id="test_user_in_db",
username="testuser",
roleLabels=["user"],
mandateId="test_mandate_in_db"
)
whereClause = db.buildRbacWhereClause(permissions, user, "UserInDB")
# UserInDB with MY access should filter by id field
assert whereClause is not None
assert whereClause["condition"] == '"id" = %s'
assert whereClause["values"] == ["test_user_in_db"]
def testBuildRbacWhereClauseUserConnectionTable(self, db):
"""Test WHERE clause building for UserConnection table with GROUP access."""
# Create test users in the same mandate for GROUP access testing
from modules.datamodels.datamodelUam import UserInDB
testMandateId = "test_mandate_group"
# Create test users
user1 = UserInDB(
id="test_user1",
username="testuser1",
mandateId=testMandateId
)
user2 = UserInDB(
id="test_user2",
username="testuser2",
mandateId=testMandateId
)
try:
user1Data = user1.model_dump()
user1Data["id"] = user1.id
user2Data = user2.model_dump()
user2Data["id"] = user2.id
db.recordCreate(UserInDB, user1Data)
db.recordCreate(UserInDB, user2Data)
permissions = UserPermissions(
view=True,
read=AccessLevel.GROUP,
create=AccessLevel.GROUP,
update=AccessLevel.GROUP,
delete=AccessLevel.GROUP
)
user = User(
id="test_user1",
username="testuser1",
roleLabels=["admin"],
mandateId=testMandateId
)
whereClause = db.buildRbacWhereClause(permissions, user, "UserConnection")
assert whereClause is not None
assert "userId" in whereClause["condition"]
assert "IN" in whereClause["condition"]
assert len(whereClause["values"]) >= 2
finally:
# Cleanup test users
try:
db.recordDelete(UserInDB, "test_user1")
db.recordDelete(UserInDB, "test_user2")
except:
pass