183 lines
6.3 KiB
Python
183 lines
6.3 KiB
Python
#!/usr/bin/env python
|
|
# -*- coding: utf-8 -*-
|
|
"""
|
|
Migration Script: Migrate AccessRules to Vollqualifizierte ObjectKeys
|
|
|
|
This script migrates existing AccessRules in the database from short item names
|
|
(e.g., "dashboard", "positions") to fully qualified ObjectKeys
|
|
(e.g., "ui.feature.trustee.dashboard", "ui.feature.trustee.positions").
|
|
|
|
This is required for the Navigation-API-Konzept implementation.
|
|
|
|
Usage:
|
|
python script_db_migrate_accessrules_objectkeys.py [--dry-run]
|
|
|
|
Options:
|
|
--dry-run Show what would be changed without making actual changes
|
|
"""
|
|
|
|
import sys
|
|
import os
|
|
import logging
|
|
from typing import Dict, List, Any, Optional
|
|
|
|
# Add parent directory to path for imports
|
|
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# Mapping of short item names to fully qualified ObjectKeys per feature
|
|
MIGRATION_MAP: Dict[str, Dict[str, str]] = {
|
|
"trustee": {
|
|
# UI items
|
|
"dashboard": "ui.feature.trustee.dashboard",
|
|
"positions": "ui.feature.trustee.positions",
|
|
"documents": "ui.feature.trustee.documents",
|
|
"settings": "ui.feature.trustee.settings",
|
|
"instance-roles": "ui.feature.trustee.instance-roles",
|
|
# RESOURCE items
|
|
"instance-roles.manage": "resource.feature.trustee.instance-roles.manage",
|
|
},
|
|
"realestate": {
|
|
# UI items
|
|
"projects": "ui.feature.realestate.projects",
|
|
"parcels": "ui.feature.realestate.parcels",
|
|
# RESOURCE items
|
|
"project.create": "resource.feature.realestate.project.create",
|
|
"project.delete": "resource.feature.realestate.project.delete",
|
|
},
|
|
}
|
|
|
|
|
|
def migrateAccessRules(dryRun: bool = False) -> Dict[str, int]:
|
|
"""
|
|
Migrate AccessRules from short item names to fully qualified ObjectKeys.
|
|
|
|
Args:
|
|
dryRun: If True, don't make actual changes, just show what would be done
|
|
|
|
Returns:
|
|
Dictionary with migration statistics
|
|
"""
|
|
from modules.interfaces.interfaceDbApp import getRootInterface
|
|
from modules.datamodels.datamodelRbac import Role, AccessRule, AccessRuleContext
|
|
|
|
stats = {
|
|
"total_rules": 0,
|
|
"migrated": 0,
|
|
"already_qualified": 0,
|
|
"skipped_null": 0,
|
|
"errors": 0,
|
|
}
|
|
|
|
try:
|
|
rootInterface = getRootInterface()
|
|
db = rootInterface.db
|
|
|
|
# Get all AccessRules
|
|
allRules = db.getRecordset(AccessRule, recordFilter=None)
|
|
stats["total_rules"] = len(allRules)
|
|
|
|
logger.info(f"Found {len(allRules)} AccessRules to check")
|
|
|
|
for rule in allRules:
|
|
ruleId = rule.get("id")
|
|
context = rule.get("context")
|
|
item = rule.get("item")
|
|
roleId = rule.get("roleId")
|
|
|
|
# Skip rules without item (wildcard rules)
|
|
if item is None:
|
|
stats["skipped_null"] += 1
|
|
continue
|
|
|
|
# Skip if already fully qualified
|
|
if item.startswith("ui.") or item.startswith("resource.") or item.startswith("data."):
|
|
stats["already_qualified"] += 1
|
|
continue
|
|
|
|
# Get the role to determine feature code
|
|
roles = db.getRecordset(Role, recordFilter={"id": roleId})
|
|
if not roles or len(roles) == 0:
|
|
logger.warning(f"Rule {ruleId}: Role {roleId} not found, skipping")
|
|
stats["errors"] += 1
|
|
continue
|
|
|
|
role = roles[0]
|
|
featureCode = role.get("featureCode")
|
|
if not featureCode or featureCode not in MIGRATION_MAP:
|
|
logger.debug(f"Rule {ruleId}: Feature '{featureCode}' not in migration map, skipping")
|
|
continue
|
|
|
|
# Lookup the new ObjectKey
|
|
featureMap = MIGRATION_MAP[featureCode]
|
|
if item not in featureMap:
|
|
logger.warning(f"Rule {ruleId}: Item '{item}' not in migration map for feature '{featureCode}'")
|
|
stats["errors"] += 1
|
|
continue
|
|
|
|
newItem = featureMap[item]
|
|
|
|
if dryRun:
|
|
logger.info(f"[DRY-RUN] Would migrate rule {ruleId}: '{item}' -> '{newItem}' (feature: {featureCode})")
|
|
else:
|
|
# Update the rule using recordModify
|
|
try:
|
|
db.recordModify(AccessRule, ruleId, {"item": newItem})
|
|
logger.info(f"Migrated rule {ruleId}: '{item}' -> '{newItem}' (feature: {featureCode})")
|
|
except Exception as e:
|
|
logger.error(f"Failed to migrate rule {ruleId}: {e}")
|
|
stats["errors"] += 1
|
|
continue
|
|
|
|
stats["migrated"] += 1
|
|
|
|
return stats
|
|
|
|
except Exception as e:
|
|
logger.error(f"Migration failed: {e}", exc_info=True)
|
|
raise
|
|
|
|
|
|
def main():
|
|
"""Main entry point."""
|
|
dryRun = "--dry-run" in sys.argv
|
|
forceRun = "--force" in sys.argv
|
|
|
|
if dryRun:
|
|
logger.info("=" * 60)
|
|
logger.info("DRY RUN MODE - No changes will be made")
|
|
logger.info("=" * 60)
|
|
else:
|
|
logger.info("=" * 60)
|
|
logger.info("LIVE MODE - Changes will be applied to the database")
|
|
logger.info("=" * 60)
|
|
|
|
if not forceRun:
|
|
# Confirm before proceeding
|
|
confirm = input("Are you sure you want to proceed? (yes/no): ")
|
|
if confirm.lower() != "yes":
|
|
logger.info("Migration cancelled")
|
|
return
|
|
|
|
try:
|
|
stats = migrateAccessRules(dryRun=dryRun)
|
|
|
|
logger.info("=" * 60)
|
|
logger.info("Migration completed!")
|
|
logger.info(f" Total rules checked: {stats['total_rules']}")
|
|
logger.info(f" Rules migrated: {stats['migrated']}")
|
|
logger.info(f" Already qualified: {stats['already_qualified']}")
|
|
logger.info(f" Skipped (null item): {stats['skipped_null']}")
|
|
logger.info(f" Errors: {stats['errors']}")
|
|
logger.info("=" * 60)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Migration failed: {e}")
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|