#!/usr/bin/env python # -*- coding: utf-8 -*- """ Migration Script: Migrate AccessRules to Vollqualifizierte ObjectKeys This script migrates existing AccessRules in the database from short item names (e.g., "dashboard", "positions") to fully qualified ObjectKeys (e.g., "ui.feature.trustee.dashboard", "ui.feature.trustee.positions"). This is required for the Navigation-API-Konzept implementation. Usage: python script_db_migrate_accessrules_objectkeys.py [--dry-run] Options: --dry-run Show what would be changed without making actual changes """ import sys import os import logging from typing import Dict, List, Any, Optional # Add parent directory to path for imports sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # Mapping of short item names to fully qualified ObjectKeys per feature MIGRATION_MAP: Dict[str, Dict[str, str]] = { "trustee": { # UI items "dashboard": "ui.feature.trustee.dashboard", "positions": "ui.feature.trustee.positions", "documents": "ui.feature.trustee.documents", "settings": "ui.feature.trustee.settings", "instance-roles": "ui.feature.trustee.instance-roles", # RESOURCE items "instance-roles.manage": "resource.feature.trustee.instance-roles.manage", }, "realestate": { # UI items "projects": "ui.feature.realestate.projects", "parcels": "ui.feature.realestate.parcels", # RESOURCE items "project.create": "resource.feature.realestate.project.create", "project.delete": "resource.feature.realestate.project.delete", }, } def migrateAccessRules(dryRun: bool = False) -> Dict[str, int]: """ Migrate AccessRules from short item names to fully qualified ObjectKeys. Args: dryRun: If True, don't make actual changes, just show what would be done Returns: Dictionary with migration statistics """ from modules.interfaces.interfaceDbApp import getRootInterface from modules.datamodels.datamodelRbac import Role, AccessRule, AccessRuleContext stats = { "total_rules": 0, "migrated": 0, "already_qualified": 0, "skipped_null": 0, "errors": 0, } try: rootInterface = getRootInterface() db = rootInterface.db # Get all AccessRules allRules = db.getRecordset(AccessRule, recordFilter=None) stats["total_rules"] = len(allRules) logger.info(f"Found {len(allRules)} AccessRules to check") for rule in allRules: ruleId = rule.get("id") context = rule.get("context") item = rule.get("item") roleId = rule.get("roleId") # Skip rules without item (wildcard rules) if item is None: stats["skipped_null"] += 1 continue # Skip if already fully qualified if item.startswith("ui.") or item.startswith("resource.") or item.startswith("data."): stats["already_qualified"] += 1 continue # Get the role to determine feature code roles = db.getRecordset(Role, recordFilter={"id": roleId}) if not roles or len(roles) == 0: logger.warning(f"Rule {ruleId}: Role {roleId} not found, skipping") stats["errors"] += 1 continue role = roles[0] featureCode = role.get("featureCode") if not featureCode or featureCode not in MIGRATION_MAP: logger.debug(f"Rule {ruleId}: Feature '{featureCode}' not in migration map, skipping") continue # Lookup the new ObjectKey featureMap = MIGRATION_MAP[featureCode] if item not in featureMap: logger.warning(f"Rule {ruleId}: Item '{item}' not in migration map for feature '{featureCode}'") stats["errors"] += 1 continue newItem = featureMap[item] if dryRun: logger.info(f"[DRY-RUN] Would migrate rule {ruleId}: '{item}' -> '{newItem}' (feature: {featureCode})") else: # Update the rule using recordModify try: db.recordModify(AccessRule, ruleId, {"item": newItem}) logger.info(f"Migrated rule {ruleId}: '{item}' -> '{newItem}' (feature: {featureCode})") except Exception as e: logger.error(f"Failed to migrate rule {ruleId}: {e}") stats["errors"] += 1 continue stats["migrated"] += 1 return stats except Exception as e: logger.error(f"Migration failed: {e}", exc_info=True) raise def main(): """Main entry point.""" dryRun = "--dry-run" in sys.argv forceRun = "--force" in sys.argv if dryRun: logger.info("=" * 60) logger.info("DRY RUN MODE - No changes will be made") logger.info("=" * 60) else: logger.info("=" * 60) logger.info("LIVE MODE - Changes will be applied to the database") logger.info("=" * 60) if not forceRun: # Confirm before proceeding confirm = input("Are you sure you want to proceed? (yes/no): ") if confirm.lower() != "yes": logger.info("Migration cancelled") return try: stats = migrateAccessRules(dryRun=dryRun) logger.info("=" * 60) logger.info("Migration completed!") logger.info(f" Total rules checked: {stats['total_rules']}") logger.info(f" Rules migrated: {stats['migrated']}") logger.info(f" Already qualified: {stats['already_qualified']}") logger.info(f" Skipped (null item): {stats['skipped_null']}") logger.info(f" Errors: {stats['errors']}") logger.info("=" * 60) except Exception as e: logger.error(f"Migration failed: {e}") sys.exit(1) if __name__ == "__main__": main()