gateway/tests/integration/rbac/test_rbac_database.py
ValueOn AG 75484c0f73 BREAKING CHANGE
API and persisted records use PowerOnModel system fields:
- sysCreatedAt, sysCreatedBy, sysModifiedAt, sysModifiedBy
Removed legacy JSON/DB field names:
- _createdAt, _createdBy, _modifiedAt, _modifiedBy
Frontend (frontend_nyla) and gateway call sites were updated accordingly.
Database:
- Bootstrap runs idempotent backfill (_migrateSystemFieldColumns) from old
  underscore columns and selected business duplicates into sys* where sys* IS NULL.
- Re-run app bootstrap against each PostgreSQL database after deploy.
- Optional: DROP INDEX IF EXISTS "idx_invitation_createdby" if an old index remains;
  new index: idx_invitation_syscreatedby on Invitation(sysCreatedBy).
Tests:
- RBAC integration tests aligned with current GROUP mandate filter and UserMandate-based
  UserConnection GROUP clause; buildRbacWhereClause(..., mandateId=...) must be passed
  explicitly (same as production request context).
2026-03-28 18:12:37 +01:00

237 lines
7.7 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Integration tests for RBAC database filtering.
Tests that database queries correctly filter records based on RBAC rules.
Uses real database connection for integration testing.
"""
import pytest
from modules.connectors.connectorDbPostgre import DatabaseConnector
from modules.datamodels.datamodelUam import User, AccessLevel, UserPermissions
from modules.shared.configuration import APP_CONFIG
@pytest.fixture(scope="class")
def db():
"""Create real database connector for integration tests."""
dbHost = APP_CONFIG.get("DB_HOST", "localhost")
dbDatabase = APP_CONFIG.get("DB_DATABASE", "poweron_test")
dbUser = APP_CONFIG.get("DB_USER", "postgres")
dbPassword = APP_CONFIG.get("DB_PASSWORD", "")
dbPort = APP_CONFIG.get("DB_PORT", 5432)
db = DatabaseConnector(
dbHost=dbHost,
dbDatabase=dbDatabase,
dbUser=dbUser,
dbPassword=dbPassword,
dbPort=dbPort
)
yield db
db.close()
class TestRbacDatabaseFiltering:
"""Test RBAC database filtering."""
def testBuildRbacWhereClauseAllAccess(self, db):
"""Test WHERE clause building for ALL access level."""
permissions = UserPermissions(
view=True,
read=AccessLevel.ALL,
create=AccessLevel.ALL,
update=AccessLevel.ALL,
delete=AccessLevel.ALL
)
user = User(
id="test_user_all",
username="testuser",
roleLabels=["sysadmin"],
)
whereClause = db.buildRbacWhereClause(permissions, user, "SomeTable")
# ALL access should return None (no filtering)
assert whereClause is None
def testBuildRbacWhereClauseMyAccess(self, db):
"""Test WHERE clause building for MY access level."""
permissions = UserPermissions(
view=True,
read=AccessLevel.MY,
create=AccessLevel.MY,
update=AccessLevel.MY,
delete=AccessLevel.MY
)
user = User(
id="test_user_my",
username="testuser",
roleLabels=["user"],
)
whereClause = db.buildRbacWhereClause(permissions, user, "SomeTable")
assert whereClause is not None
assert whereClause["condition"] == '"sysCreatedBy" = %s'
assert whereClause["values"] == ["test_user_my"]
def testBuildRbacWhereClauseGroupAccess(self, db):
"""Test WHERE clause building for GROUP access level."""
permissions = UserPermissions(
view=True,
read=AccessLevel.GROUP,
create=AccessLevel.GROUP,
update=AccessLevel.GROUP,
delete=AccessLevel.GROUP
)
mandate_id = "test_mandate_group"
user = User(
id="test_user_group",
username="testuser",
roleLabels=["admin"],
)
whereClause = db.buildRbacWhereClause(
permissions, user, "SomeTable", mandateId=mandate_id
)
assert whereClause is not None
assert whereClause["condition"] == '("mandateId" = %s OR "mandateId" IS NULL)'
assert whereClause["values"] == ["test_mandate_group"]
def testBuildRbacWhereClauseNoAccess(self, db):
"""Test WHERE clause building for NONE access level."""
permissions = UserPermissions(
view=True,
read=AccessLevel.NONE,
create=AccessLevel.NONE,
update=AccessLevel.NONE,
delete=AccessLevel.NONE
)
user = User(
id="test_user_none",
username="testuser",
roleLabels=["viewer"],
)
whereClause = db.buildRbacWhereClause(permissions, user, "SomeTable")
assert whereClause is not None
assert whereClause["condition"] == "1 = 0" # Always false
assert whereClause["values"] == []
def testBuildRbacWhereClauseUserInDBTable(self, db):
"""Test WHERE clause building for UserInDB table with MY access."""
permissions = UserPermissions(
view=True,
read=AccessLevel.MY,
create=AccessLevel.MY,
update=AccessLevel.MY,
delete=AccessLevel.MY
)
user = User(
id="test_user_in_db",
username="testuser",
roleLabels=["user"],
)
whereClause = db.buildRbacWhereClause(permissions, user, "UserInDB")
# UserInDB with MY access should filter by id field
assert whereClause is not None
assert whereClause["condition"] == '"id" = %s'
assert whereClause["values"] == ["test_user_in_db"]
def testBuildRbacWhereClauseUserConnectionTable(self, db):
"""GROUP on UserConnection resolves member userIds via UserMandate (multi-tenant)."""
from modules.datamodels.datamodelUam import UserInDB, Mandate
from modules.datamodels.datamodelMembership import UserMandate
testMandateId = "rbac_test_mandate_uc"
user1Id = "rbac_test_user_uc1"
user2Id = "rbac_test_user_uc2"
userMandateIds = []
try:
mandate = Mandate(
id=testMandateId,
name="RBAC test mandate",
label="RBAC test",
)
mandatePayload = mandate.model_dump()
mandatePayload["id"] = mandate.id
db.recordCreate(Mandate, mandatePayload)
for uid, uname in (
(user1Id, "rbac_uc_user1"),
(user2Id, "rbac_uc_user2"),
):
u = UserInDB(
id=uid,
username=uname,
email=f"{uid}@example.com",
hashedPassword="not-used",
)
payload = u.model_dump()
payload["id"] = u.id
db.recordCreate(UserInDB, payload)
for uid in (user1Id, user2Id):
um = UserMandate(userId=uid, mandateId=testMandateId, enabled=True)
umPayload = um.model_dump()
umPayload["id"] = um.id
createdUm = db.recordCreate(UserMandate, umPayload)
if createdUm and createdUm.get("id"):
userMandateIds.append(createdUm["id"])
else:
userMandateIds.append(um.id)
permissions = UserPermissions(
view=True,
read=AccessLevel.GROUP,
create=AccessLevel.GROUP,
update=AccessLevel.GROUP,
delete=AccessLevel.GROUP,
)
user = User(
id=user1Id,
username="rbac_uc_user1",
roleLabels=["admin"],
)
whereClause = db.buildRbacWhereClause(
permissions, user, "UserConnection", mandateId=testMandateId
)
assert whereClause is not None
assert whereClause["condition"] != "1 = 0"
assert "userId" in whereClause["condition"]
assert "IN" in whereClause["condition"]
assert set(whereClause["values"]) == {user1Id, user2Id}
finally:
for umId in userMandateIds:
try:
db.recordDelete(UserMandate, umId)
except Exception:
pass
for uid in (user1Id, user2Id):
try:
db.recordDelete(UserInDB, uid)
except Exception:
pass
try:
db.recordDelete(Mandate, testMandateId)
except Exception:
pass