gateway/tests/integration/rbac/test_rbac_migration.py
2025-12-07 13:48:39 +01:00

282 lines
11 KiB
Python

"""
Integration tests for UAM to RBAC migration.
Tests that migration correctly converts user privileges to roleLabels.
Uses real database connection for integration testing.
"""
import pytest
from modules.migration.migrateUamToRbac import migrateUamToRbac, validateMigration
from modules.datamodels.datamodelUam import UserInDB, UserPrivilege
from modules.connectors.connectorDbPostgre import DatabaseConnector
from modules.shared.configuration import APP_CONFIG
@pytest.fixture(scope="class")
def db():
"""Create real database connector for integration tests."""
dbHost = APP_CONFIG.get("DB_HOST", "localhost")
dbDatabase = APP_CONFIG.get("DB_DATABASE", "poweron_test")
dbUser = APP_CONFIG.get("DB_USER", "postgres")
dbPassword = APP_CONFIG.get("DB_PASSWORD", "")
dbPort = APP_CONFIG.get("DB_PORT", 5432)
db = DatabaseConnector(
dbHost=dbHost,
dbDatabase=dbDatabase,
dbUser=dbUser,
dbPassword=dbPassword,
dbPort=dbPort
)
yield db
db.close()
class TestRbacMigration:
"""Test RBAC migration from UAM."""
def testMigrateUserPrivilegeToRoleLabels(self, db):
"""Test that user privileges are correctly converted to roleLabels."""
# Create test users with privileges but no roleLabels
testUsers = [
UserInDB(
id="migrate_test_user1",
username="migrate_admin",
privilege=UserPrivilege.SYSADMIN.value
),
UserInDB(
id="migrate_test_user2",
username="migrate_admin2",
privilege=UserPrivilege.ADMIN.value
),
UserInDB(
id="migrate_test_user3",
username="migrate_user1",
privilege=UserPrivilege.USER.value
)
]
try:
# Create test users in database
for user in testUsers:
userData = user.model_dump()
# Ensure roleLabels is None/empty for migration test
userData["roleLabels"] = []
userData["id"] = user.id
db.recordCreate(UserInDB, userData)
# Run migration
results = migrateUamToRbac(db, dryRun=False)
# Check that users were updated
assert results["usersUpdated"] == 3
# Verify users were actually updated in database
users1 = db.getRecordset(UserInDB, recordFilter={"id": "migrate_test_user1"})
users2 = db.getRecordset(UserInDB, recordFilter={"id": "migrate_test_user2"})
users3 = db.getRecordset(UserInDB, recordFilter={"id": "migrate_test_user3"})
user1 = users1[0] if users1 else None
user2 = users2[0] if users2 else None
user3 = users3[0] if users3 else None
assert user1 is not None
assert "sysadmin" in user1.get("roleLabels", [])
assert user2 is not None
assert "admin" in user2.get("roleLabels", [])
assert user3 is not None
assert "user" in user3.get("roleLabels", [])
finally:
# Cleanup test users
for user in testUsers:
try:
db.recordDelete(UserInDB, user.id)
except:
pass
def testMigrationSkipsUsersWithExistingRoleLabels(self, db):
"""Test that migration skips users who already have roleLabels."""
# Create test users: one with roleLabels, one without
user1 = UserInDB(
id="skip_test_user1",
username="skip_admin",
privilege=UserPrivilege.SYSADMIN.value,
roleLabels=["sysadmin"] # Already migrated
)
user2 = UserInDB(
id="skip_test_user2",
username="skip_user1",
privilege=UserPrivilege.USER.value,
roleLabels=[] # Needs migration
)
try:
# Create test users in database
user1Data = user1.model_dump()
user1Data["id"] = user1.id
user2Data = user2.model_dump()
user2Data["id"] = user2.id
db.recordCreate(UserInDB, user1Data)
db.recordCreate(UserInDB, user2Data)
# Run migration
results = migrateUamToRbac(db, dryRun=False)
# Only one user should be updated (user2)
assert results["usersUpdated"] == 1
# Verify user1 still has original roleLabels
users1 = db.getRecordset(UserInDB, recordFilter={"id": "skip_test_user1"})
updatedUser1 = users1[0] if users1 else None
assert updatedUser1 is not None
assert "sysadmin" in updatedUser1.get("roleLabels", [])
# Verify user2 was updated
users2 = db.getRecordset(UserInDB, recordFilter={"id": "skip_test_user2"})
updatedUser2 = users2[0] if users2 else None
assert updatedUser2 is not None
assert "user" in updatedUser2.get("roleLabels", [])
finally:
# Cleanup test users
try:
db.recordDelete(UserInDB, "skip_test_user1")
db.recordDelete(UserInDB, "skip_test_user2")
except:
pass
def testDryRunMode(self, db):
"""Test that dry run mode doesn't make changes."""
# Create test user without roleLabels
testUser = UserInDB(
id="dryrun_test_user1",
username="dryrun_admin",
privilege=UserPrivilege.SYSADMIN.value,
roleLabels=[] # Needs migration
)
try:
# Create test user in database
userData = testUser.model_dump()
userData["id"] = testUser.id
db.recordCreate(UserInDB, userData)
# Get original state
originalUsers = db.getRecordset(UserInDB, recordFilter={"id": "dryrun_test_user1"})
originalUser = originalUsers[0] if originalUsers else None
assert originalUser is not None
originalRoleLabels = originalUser.get("roleLabels", [])
# Run migration in dry run mode
results = migrateUamToRbac(db, dryRun=True)
# Should report what would be done
assert results["usersUpdated"] == 1
# Verify user was NOT actually updated
unchangedUsers = db.getRecordset(UserInDB, recordFilter={"id": "dryrun_test_user1"})
unchangedUser = unchangedUsers[0] if unchangedUsers else None
assert unchangedUser is not None
assert unchangedUser.get("roleLabels", []) == originalRoleLabels
finally:
# Cleanup test user
try:
db.recordDelete(UserInDB, "dryrun_test_user1")
except:
pass
def testValidateMigrationSuccess(self, db):
"""Test validation passes when migration is successful."""
# Create test users with roleLabels (already migrated)
testUsers = [
UserInDB(
id="validate_test_user1",
username="validate_admin",
privilege=UserPrivilege.SYSADMIN.value,
roleLabels=["sysadmin"]
),
UserInDB(
id="validate_test_user2",
username="validate_admin2",
privilege=UserPrivilege.ADMIN.value,
roleLabels=["admin"]
)
]
try:
# Create test users in database
for user in testUsers:
userData = user.model_dump()
userData["id"] = user.id
db.recordCreate(UserInDB, userData)
# Ensure AccessRule table exists (migration should have created it)
from modules.datamodels.datamodelRbac import AccessRule
db._ensureTableExists(AccessRule)
# Run validation
validation = validateMigration(db)
assert validation["valid"] == True
assert len(validation["issues"]) == 0
finally:
# Cleanup test users
for user in testUsers:
try:
db.recordDelete(UserInDB, user.id)
except:
pass
def testValidateMigrationFailsWithoutRoleLabels(self, db):
"""Test validation fails when users don't have roleLabels."""
# Create test users: one with roleLabels, one without, one with empty roleLabels
testUsers = [
UserInDB(
id="validate_fail_user1",
username="validate_fail_admin",
privilege=UserPrivilege.SYSADMIN.value,
roleLabels=["sysadmin"] # Has roleLabels
),
UserInDB(
id="validate_fail_user2",
username="validate_fail_user",
privilege=UserPrivilege.USER.value,
roleLabels=[] # Empty roleLabels
),
UserInDB(
id="validate_fail_user3",
username="validate_fail_user2",
privilege=UserPrivilege.USER.value
# Missing roleLabels field (will be None)
)
]
try:
# Create test users in database
for user in testUsers:
userData = user.model_dump()
userData["id"] = user.id
# For user3, explicitly set roleLabels to None or remove it
if user.id == "validate_fail_user3":
if "roleLabels" in userData:
del userData["roleLabels"]
db.recordCreate(UserInDB, userData)
# Ensure AccessRule table exists
from modules.datamodels.datamodelRbac import AccessRule
db._ensureTableExists(AccessRule)
# Run validation
validation = validateMigration(db)
assert validation["valid"] == False
assert len(validation["issues"]) > 0
# Check that validation found users without roleLabels
issuesStr = " ".join(validation["issues"])
assert "users without roleLabels" in issuesStr or "without roleLabels" in issuesStr
finally:
# Cleanup test users
for user in testUsers:
try:
db.recordDelete(UserInDB, user.id)
except:
pass