gateway/scripts/_archive/script_db_migrate_accessrules_objectkeys.py
2026-04-29 21:27:08 +02:00

183 lines
6.3 KiB
Python

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Migration Script: Migrate AccessRules to Vollqualifizierte ObjectKeys
This script migrates existing AccessRules in the database from short item names
(e.g., "dashboard", "positions") to fully qualified ObjectKeys
(e.g., "ui.feature.trustee.dashboard", "ui.feature.trustee.positions").
This is required for the Navigation-API-Konzept implementation.
Usage:
python script_db_migrate_accessrules_objectkeys.py [--dry-run]
Options:
--dry-run Show what would be changed without making actual changes
"""
import sys
import os
import logging
from typing import Dict, List, Any, Optional
# Add parent directory to path for imports
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Mapping of short item names to fully qualified ObjectKeys per feature
MIGRATION_MAP: Dict[str, Dict[str, str]] = {
"trustee": {
# UI items
"dashboard": "ui.feature.trustee.dashboard",
"positions": "ui.feature.trustee.positions",
"documents": "ui.feature.trustee.documents",
"settings": "ui.feature.trustee.settings",
"instance-roles": "ui.feature.trustee.instance-roles",
# RESOURCE items
"instance-roles.manage": "resource.feature.trustee.instance-roles.manage",
},
"realestate": {
# UI items
"projects": "ui.feature.realestate.projects",
"parcels": "ui.feature.realestate.parcels",
# RESOURCE items
"project.create": "resource.feature.realestate.project.create",
"project.delete": "resource.feature.realestate.project.delete",
},
}
def migrateAccessRules(dryRun: bool = False) -> Dict[str, int]:
"""
Migrate AccessRules from short item names to fully qualified ObjectKeys.
Args:
dryRun: If True, don't make actual changes, just show what would be done
Returns:
Dictionary with migration statistics
"""
from modules.interfaces.interfaceDbApp import getRootInterface
from modules.datamodels.datamodelRbac import Role, AccessRule, AccessRuleContext
stats = {
"total_rules": 0,
"migrated": 0,
"already_qualified": 0,
"skipped_null": 0,
"errors": 0,
}
try:
rootInterface = getRootInterface()
db = rootInterface.db
# Get all AccessRules
allRules = db.getRecordset(AccessRule, recordFilter=None)
stats["total_rules"] = len(allRules)
logger.info(f"Found {len(allRules)} AccessRules to check")
for rule in allRules:
ruleId = rule.get("id")
context = rule.get("context")
item = rule.get("item")
roleId = rule.get("roleId")
# Skip rules without item (wildcard rules)
if item is None:
stats["skipped_null"] += 1
continue
# Skip if already fully qualified
if item.startswith("ui.") or item.startswith("resource.") or item.startswith("data."):
stats["already_qualified"] += 1
continue
# Get the role to determine feature code
roles = db.getRecordset(Role, recordFilter={"id": roleId})
if not roles or len(roles) == 0:
logger.warning(f"Rule {ruleId}: Role {roleId} not found, skipping")
stats["errors"] += 1
continue
role = roles[0]
featureCode = role.get("featureCode")
if not featureCode or featureCode not in MIGRATION_MAP:
logger.debug(f"Rule {ruleId}: Feature '{featureCode}' not in migration map, skipping")
continue
# Lookup the new ObjectKey
featureMap = MIGRATION_MAP[featureCode]
if item not in featureMap:
logger.warning(f"Rule {ruleId}: Item '{item}' not in migration map for feature '{featureCode}'")
stats["errors"] += 1
continue
newItem = featureMap[item]
if dryRun:
logger.info(f"[DRY-RUN] Would migrate rule {ruleId}: '{item}' -> '{newItem}' (feature: {featureCode})")
else:
# Update the rule using recordModify
try:
db.recordModify(AccessRule, ruleId, {"item": newItem})
logger.info(f"Migrated rule {ruleId}: '{item}' -> '{newItem}' (feature: {featureCode})")
except Exception as e:
logger.error(f"Failed to migrate rule {ruleId}: {e}")
stats["errors"] += 1
continue
stats["migrated"] += 1
return stats
except Exception as e:
logger.error(f"Migration failed: {e}", exc_info=True)
raise
def main():
"""Main entry point."""
dryRun = "--dry-run" in sys.argv
forceRun = "--force" in sys.argv
if dryRun:
logger.info("=" * 60)
logger.info("DRY RUN MODE - No changes will be made")
logger.info("=" * 60)
else:
logger.info("=" * 60)
logger.info("LIVE MODE - Changes will be applied to the database")
logger.info("=" * 60)
if not forceRun:
# Confirm before proceeding
confirm = input("Are you sure you want to proceed? (yes/no): ")
if confirm.lower() != "yes":
logger.info("Migration cancelled")
return
try:
stats = migrateAccessRules(dryRun=dryRun)
logger.info("=" * 60)
logger.info("Migration completed!")
logger.info(f" Total rules checked: {stats['total_rules']}")
logger.info(f" Rules migrated: {stats['migrated']}")
logger.info(f" Already qualified: {stats['already_qualified']}")
logger.info(f" Skipped (null item): {stats['skipped_null']}")
logger.info(f" Errors: {stats['errors']}")
logger.info("=" * 60)
except Exception as e:
logger.error(f"Migration failed: {e}")
sys.exit(1)
if __name__ == "__main__":
main()