#!/usr/bin/env python3 """ Cleanup script for duplicate roles in the database. This script removes duplicate Role records that were created due to the IS NULL bug in connectorDbPostgre.py. The bug caused `mandateId = NULL` to always return FALSE, which meant the duplicate check in bootstrap didn't work. Usage: python cleanupDuplicateRoles.py The script will: 1. Find all duplicate roles (same roleLabel + featureCode + featureInstanceId + mandateId) 2. Keep the oldest one (first created) and delete the rest 3. Report the number of deleted roles """ import sys import os # Add parent directory to path gatewayDir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.insert(0, gatewayDir) # Load environment variables from env_dev.env from dotenv import load_dotenv envPath = os.path.join(gatewayDir, "env_dev.env") if os.path.exists(envPath): load_dotenv(envPath) from modules.datamodels.datamodelRbac import Role from modules.security.rootAccess import getRootDbAppConnector import logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) def _getDbConnector(): """Get a database connector using the application's configuration.""" return getRootDbAppConnector() def cleanupDuplicateRoles(): """ Clean up duplicate roles in the database. Keeps the first role (by ID, which is UUID-based) for each unique combination of: - roleLabel - featureCode - featureInstanceId - mandateId """ db = _getDbConnector() # Get all roles allRoles = db.getRecordset(Role, recordFilter=None) logger.info(f"Found {len(allRoles)} total roles in database") # Group roles by their unique key roleGroups = {} for role in allRoles: # Create a key tuple for grouping # Note: None values need special handling for dict keys key = ( role.get("roleLabel"), role.get("featureCode") or "__NONE__", role.get("featureInstanceId") or "__NONE__", role.get("mandateId") or "__NONE__" ) if key not in roleGroups: roleGroups[key] = [] roleGroups[key].append(role) # Find and delete duplicates deletedCount = 0 for key, roles in roleGroups.items(): if len(roles) > 1: # Sort by ID (UUID, string comparison works for finding "first") # Actually, we want to keep one - let's keep by created order if available # Since there's no createdAt, we'll just keep the first one toKeep = roles[0] toDelete = roles[1:] logger.info(f"Found {len(roles)} duplicates for key {key}") logger.info(f" Keeping: {toKeep.get('id')} ({toKeep.get('roleLabel')})") for role in toDelete: roleId = role.get("id") try: db.recordDelete(Role, roleId) deletedCount += 1 logger.info(f" Deleted: {roleId}") except Exception as e: logger.error(f" Failed to delete {roleId}: {e}") logger.info(f"Cleanup complete: {deletedCount} duplicate roles deleted") logger.info(f"Remaining roles: {len(allRoles) - deletedCount}") return deletedCount def showRoleSummary(): """Show a summary of roles grouped by type.""" db = _getDbConnector() allRoles = db.getRecordset(Role, recordFilter=None) # Categorize roles systemRoles = [] templateRoles = [] mandateRoles = [] instanceRoles = [] for role in allRoles: mandateId = role.get("mandateId") featureInstanceId = role.get("featureInstanceId") featureCode = role.get("featureCode") isSystemRole = role.get("isSystemRole", False) if isSystemRole: systemRoles.append(role) elif mandateId is None and featureInstanceId is None and featureCode: templateRoles.append(role) elif mandateId is None and featureInstanceId is None and not featureCode: # Global non-system role (shouldn't exist normally) systemRoles.append(role) elif mandateId and featureInstanceId is None: mandateRoles.append(role) elif featureInstanceId: instanceRoles.append(role) print("\n" + "=" * 60) print("ROLE SUMMARY") print("=" * 60) print(f"\n1. SYSTEM ROLES ({len(systemRoles)}):") for r in systemRoles: print(f" - {r.get('roleLabel')} (isSystemRole={r.get('isSystemRole')})") print(f"\n2. TEMPLATE ROLES ({len(templateRoles)}) - (mandateId=NULL, featureInstanceId=NULL, featureCode!=NULL):") templateByFeature = {} for r in templateRoles: fc = r.get("featureCode") if fc not in templateByFeature: templateByFeature[fc] = [] templateByFeature[fc].append(r) for fc, roles in sorted(templateByFeature.items()): print(f" [{fc}] ({len(roles)} roles):") for r in roles: print(f" - {r.get('roleLabel')}") print(f"\n3. MANDATE ROLES ({len(mandateRoles)}) - (mandateId!=NULL, featureInstanceId=NULL):") for r in mandateRoles[:10]: # Show max 10 print(f" - {r.get('roleLabel')} (mandate: {r.get('mandateId')[:8]}...)") if len(mandateRoles) > 10: print(f" ... and {len(mandateRoles) - 10} more") print(f"\n4. INSTANCE ROLES ({len(instanceRoles)}) - (featureInstanceId!=NULL):") for r in instanceRoles[:10]: # Show max 10 print(f" - {r.get('roleLabel')} (instance: {r.get('featureInstanceId')[:8]}...)") if len(instanceRoles) > 10: print(f" ... and {len(instanceRoles) - 10} more") print("\n" + "=" * 60) print(f"TOTAL: {len(allRoles)} roles") print("=" * 60 + "\n") if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description="Cleanup duplicate roles in database") parser.add_argument("--summary", action="store_true", help="Show role summary without deleting") parser.add_argument("--cleanup", action="store_true", help="Delete duplicate roles") args = parser.parse_args() if args.summary: showRoleSummary() elif args.cleanup: cleanupDuplicateRoles() showRoleSummary() else: # Default: show summary only showRoleSummary() print("\nTo delete duplicates, run with --cleanup flag")