237 lines
7.7 KiB
Python
237 lines
7.7 KiB
Python
# Copyright (c) 2025 Patrick Motsch
|
|
# All rights reserved.
|
|
"""
|
|
Integration tests for RBAC database filtering.
|
|
Tests that database queries correctly filter records based on RBAC rules.
|
|
Uses real database connection for integration testing.
|
|
"""
|
|
|
|
import pytest
|
|
from modules.connectors.connectorDbPostgre import DatabaseConnector
|
|
from modules.datamodels.datamodelUam import User, AccessLevel, UserPermissions
|
|
from modules.shared.configuration import APP_CONFIG
|
|
|
|
|
|
@pytest.fixture(scope="class")
|
|
def db():
|
|
"""Create real database connector for integration tests."""
|
|
dbHost = APP_CONFIG.get("DB_HOST", "localhost")
|
|
dbDatabase = APP_CONFIG.get("DB_DATABASE", "poweron_test")
|
|
dbUser = APP_CONFIG.get("DB_USER", "postgres")
|
|
dbPassword = APP_CONFIG.get("DB_PASSWORD", "")
|
|
dbPort = APP_CONFIG.get("DB_PORT", 5432)
|
|
|
|
db = DatabaseConnector(
|
|
dbHost=dbHost,
|
|
dbDatabase=dbDatabase,
|
|
dbUser=dbUser,
|
|
dbPassword=dbPassword,
|
|
dbPort=dbPort
|
|
)
|
|
yield db
|
|
db.close()
|
|
|
|
|
|
class TestRbacDatabaseFiltering:
|
|
"""Test RBAC database filtering."""
|
|
|
|
def testBuildRbacWhereClauseAllAccess(self, db):
|
|
"""Test WHERE clause building for ALL access level."""
|
|
|
|
permissions = UserPermissions(
|
|
view=True,
|
|
read=AccessLevel.ALL,
|
|
create=AccessLevel.ALL,
|
|
update=AccessLevel.ALL,
|
|
delete=AccessLevel.ALL
|
|
)
|
|
|
|
user = User(
|
|
id="test_user_all",
|
|
username="testuser",
|
|
roleLabels=["sysadmin"],
|
|
)
|
|
|
|
whereClause = db.buildRbacWhereClause(permissions, user, "SomeTable")
|
|
|
|
# ALL access should return None (no filtering)
|
|
assert whereClause is None
|
|
|
|
def testBuildRbacWhereClauseMyAccess(self, db):
|
|
"""Test WHERE clause building for MY access level."""
|
|
|
|
permissions = UserPermissions(
|
|
view=True,
|
|
read=AccessLevel.MY,
|
|
create=AccessLevel.MY,
|
|
update=AccessLevel.MY,
|
|
delete=AccessLevel.MY
|
|
)
|
|
|
|
user = User(
|
|
id="test_user_my",
|
|
username="testuser",
|
|
roleLabels=["user"],
|
|
)
|
|
|
|
whereClause = db.buildRbacWhereClause(permissions, user, "SomeTable")
|
|
|
|
assert whereClause is not None
|
|
assert whereClause["condition"] == '"sysCreatedBy" = %s'
|
|
assert whereClause["values"] == ["test_user_my"]
|
|
|
|
def testBuildRbacWhereClauseGroupAccess(self, db):
|
|
"""Test WHERE clause building for GROUP access level."""
|
|
|
|
permissions = UserPermissions(
|
|
view=True,
|
|
read=AccessLevel.GROUP,
|
|
create=AccessLevel.GROUP,
|
|
update=AccessLevel.GROUP,
|
|
delete=AccessLevel.GROUP
|
|
)
|
|
|
|
mandate_id = "test_mandate_group"
|
|
user = User(
|
|
id="test_user_group",
|
|
username="testuser",
|
|
roleLabels=["admin"],
|
|
)
|
|
|
|
whereClause = db.buildRbacWhereClause(
|
|
permissions, user, "SomeTable", mandateId=mandate_id
|
|
)
|
|
|
|
assert whereClause is not None
|
|
assert whereClause["condition"] == '("mandateId" = %s OR "mandateId" IS NULL)'
|
|
assert whereClause["values"] == ["test_mandate_group"]
|
|
|
|
def testBuildRbacWhereClauseNoAccess(self, db):
|
|
"""Test WHERE clause building for NONE access level."""
|
|
|
|
permissions = UserPermissions(
|
|
view=True,
|
|
read=AccessLevel.NONE,
|
|
create=AccessLevel.NONE,
|
|
update=AccessLevel.NONE,
|
|
delete=AccessLevel.NONE
|
|
)
|
|
|
|
user = User(
|
|
id="test_user_none",
|
|
username="testuser",
|
|
roleLabels=["viewer"],
|
|
)
|
|
|
|
whereClause = db.buildRbacWhereClause(permissions, user, "SomeTable")
|
|
|
|
assert whereClause is not None
|
|
assert whereClause["condition"] == "1 = 0" # Always false
|
|
assert whereClause["values"] == []
|
|
|
|
def testBuildRbacWhereClauseUserInDBTable(self, db):
|
|
"""Test WHERE clause building for UserInDB table with MY access."""
|
|
|
|
permissions = UserPermissions(
|
|
view=True,
|
|
read=AccessLevel.MY,
|
|
create=AccessLevel.MY,
|
|
update=AccessLevel.MY,
|
|
delete=AccessLevel.MY
|
|
)
|
|
|
|
user = User(
|
|
id="test_user_in_db",
|
|
username="testuser",
|
|
roleLabels=["user"],
|
|
)
|
|
|
|
whereClause = db.buildRbacWhereClause(permissions, user, "UserInDB")
|
|
|
|
# UserInDB with MY access should filter by id field
|
|
assert whereClause is not None
|
|
assert whereClause["condition"] == '"id" = %s'
|
|
assert whereClause["values"] == ["test_user_in_db"]
|
|
|
|
def testBuildRbacWhereClauseUserConnectionTable(self, db):
|
|
"""GROUP on UserConnection resolves member userIds via UserMandate (multi-tenant)."""
|
|
from modules.datamodels.datamodelUam import UserInDB, Mandate
|
|
from modules.datamodels.datamodelMembership import UserMandate
|
|
|
|
testMandateId = "rbac_test_mandate_uc"
|
|
user1Id = "rbac_test_user_uc1"
|
|
user2Id = "rbac_test_user_uc2"
|
|
userMandateIds = []
|
|
|
|
try:
|
|
mandate = Mandate(
|
|
id=testMandateId,
|
|
name="rbac-test-mandate-uc",
|
|
label="RBAC test",
|
|
)
|
|
mandatePayload = mandate.model_dump()
|
|
mandatePayload["id"] = mandate.id
|
|
db.recordCreate(Mandate, mandatePayload)
|
|
|
|
for uid, uname in (
|
|
(user1Id, "rbac_uc_user1"),
|
|
(user2Id, "rbac_uc_user2"),
|
|
):
|
|
u = UserInDB(
|
|
id=uid,
|
|
username=uname,
|
|
email=f"{uid}@example.com",
|
|
hashedPassword="not-used",
|
|
)
|
|
payload = u.model_dump()
|
|
payload["id"] = u.id
|
|
db.recordCreate(UserInDB, payload)
|
|
|
|
for uid in (user1Id, user2Id):
|
|
um = UserMandate(userId=uid, mandateId=testMandateId, enabled=True)
|
|
umPayload = um.model_dump()
|
|
umPayload["id"] = um.id
|
|
createdUm = db.recordCreate(UserMandate, umPayload)
|
|
if createdUm and createdUm.get("id"):
|
|
userMandateIds.append(createdUm["id"])
|
|
else:
|
|
userMandateIds.append(um.id)
|
|
|
|
permissions = UserPermissions(
|
|
view=True,
|
|
read=AccessLevel.GROUP,
|
|
create=AccessLevel.GROUP,
|
|
update=AccessLevel.GROUP,
|
|
delete=AccessLevel.GROUP,
|
|
)
|
|
|
|
user = User(
|
|
id=user1Id,
|
|
username="rbac_uc_user1",
|
|
roleLabels=["admin"],
|
|
)
|
|
|
|
whereClause = db.buildRbacWhereClause(
|
|
permissions, user, "UserConnection", mandateId=testMandateId
|
|
)
|
|
|
|
assert whereClause is not None
|
|
assert whereClause["condition"] != "1 = 0"
|
|
assert "userId" in whereClause["condition"]
|
|
assert "IN" in whereClause["condition"]
|
|
assert set(whereClause["values"]) == {user1Id, user2Id}
|
|
finally:
|
|
for umId in userMandateIds:
|
|
try:
|
|
db.recordDelete(UserMandate, umId)
|
|
except Exception:
|
|
pass
|
|
for uid in (user1Id, user2Id):
|
|
try:
|
|
db.recordDelete(UserInDB, uid)
|
|
except Exception:
|
|
pass
|
|
try:
|
|
db.recordDelete(Mandate, testMandateId)
|
|
except Exception:
|
|
pass
|