""" Centralized bootstrap interface for system initialization. Contains all bootstrap logic including mandate, users, and RBAC rules. """ import logging from typing import Optional from passlib.context import CryptContext from modules.connectors.connectorDbPostgre import DatabaseConnector from modules.shared.configuration import APP_CONFIG from modules.datamodels.datamodelUam import ( Mandate, UserInDB, AuthAuthority, ) from modules.datamodels.datamodelRbac import ( AccessRule, AccessRuleContext, ) from modules.datamodels.datamodelUam import AccessLevel logger = logging.getLogger(__name__) # Password-Hashing pwdContext = CryptContext(schemes=["argon2"], deprecated="auto") def initBootstrap(db: DatabaseConnector) -> None: """ Main bootstrap entry point - initializes all system components. Args: db: Database connector instance """ logger.info("Starting system bootstrap") # Initialize root mandate mandateId = initRootMandate(db) # Initialize admin user adminUserId = initAdminUser(db, mandateId) # Initialize event user eventUserId = initEventUser(db, mandateId) # Initialize RBAC rules initRbacRules(db) # Assign initial user roles if adminUserId and eventUserId: assignInitialUserRoles(db, adminUserId, eventUserId) logger.info("System bootstrap completed") def initRootMandate(db: DatabaseConnector) -> Optional[str]: """ Creates the Root mandate if it doesn't exist. Args: db: Database connector instance Returns: Mandate ID if created or found, None otherwise """ existingMandates = db.getRecordset(Mandate) if existingMandates: mandateId = existingMandates[0].get("id") logger.info(f"Root mandate already exists with ID {mandateId}") return mandateId logger.info("Creating Root mandate") rootMandate = Mandate(name="Root", language="en", enabled=True) createdMandate = db.recordCreate(Mandate, rootMandate) mandateId = createdMandate.get("id") logger.info(f"Root mandate created with ID {mandateId}") return mandateId def initAdminUser(db: DatabaseConnector, mandateId: Optional[str]) -> Optional[str]: """ Creates the Admin user if it doesn't exist. Args: db: Database connector instance mandateId: Root mandate ID Returns: User ID if created or found, None otherwise """ existingUsers = db.getRecordset(UserInDB, recordFilter={"username": "admin"}) if existingUsers: userId = existingUsers[0].get("id") logger.info(f"Admin user already exists with ID {userId}") return userId logger.info("Creating Admin user") adminUser = UserInDB( mandateId=mandateId, username="admin", email="admin@example.com", fullName="Administrator", enabled=True, language="en", roleLabels=["sysadmin"], authenticationAuthority=AuthAuthority.LOCAL, hashedPassword=_getPasswordHash(APP_CONFIG.get("APP_INIT_PASS_ADMIN_SECRET")), connections=[], ) createdUser = db.recordCreate(UserInDB, adminUser) userId = createdUser.get("id") logger.info(f"Admin user created with ID {userId}") return userId def initEventUser(db: DatabaseConnector, mandateId: Optional[str]) -> Optional[str]: """ Creates the Event user if it doesn't exist. Args: db: Database connector instance mandateId: Root mandate ID Returns: User ID if created or found, None otherwise """ existingUsers = db.getRecordset(UserInDB, recordFilter={"username": "event"}) if existingUsers: userId = existingUsers[0].get("id") logger.info(f"Event user already exists with ID {userId}") return userId logger.info("Creating Event user") eventUser = UserInDB( mandateId=mandateId, username="event", email="event@example.com", fullName="Event", enabled=True, language="en", roleLabels=["sysadmin"], authenticationAuthority=AuthAuthority.LOCAL, hashedPassword=_getPasswordHash(APP_CONFIG.get("APP_INIT_PASS_EVENT_SECRET")), connections=[], ) createdUser = db.recordCreate(UserInDB, eventUser) userId = createdUser.get("id") logger.info(f"Event user created with ID {userId}") return userId def initRbacRules(db: DatabaseConnector) -> None: """ Initialize RBAC rules if they don't exist. Converts all UAM logic from interface*Access.py modules to RBAC rules. Args: db: Database connector instance """ existingRules = db.getRecordset(AccessRule) if existingRules: logger.info(f"RBAC rules already exist ({len(existingRules)} rules)") return logger.info("Initializing RBAC rules") # Create default role rules createDefaultRoleRules(db) # Create table-specific rules (converted from UAM logic) createTableSpecificRules(db) logger.info("RBAC rules initialization completed") def createDefaultRoleRules(db: DatabaseConnector) -> None: """ Create default role rules for generic access (item = null). Args: db: Database connector instance """ defaultRules = [ # SysAdmin Role - Full access to all AccessRule( roleLabel="sysadmin", context=AccessRuleContext.DATA, item=None, view=True, read=AccessLevel.ALL, create=AccessLevel.ALL, update=AccessLevel.ALL, delete=AccessLevel.ALL, ), # Admin Role - Group-level access AccessRule( roleLabel="admin", context=AccessRuleContext.DATA, item=None, view=True, read=AccessLevel.GROUP, create=AccessLevel.GROUP, update=AccessLevel.GROUP, delete=AccessLevel.NONE, ), # User Role - My records only AccessRule( roleLabel="user", context=AccessRuleContext.DATA, item=None, view=True, read=AccessLevel.MY, create=AccessLevel.MY, update=AccessLevel.MY, delete=AccessLevel.MY, ), # Viewer Role - Read-only group access AccessRule( roleLabel="viewer", context=AccessRuleContext.DATA, item=None, view=True, read=AccessLevel.GROUP, create=AccessLevel.NONE, update=AccessLevel.NONE, delete=AccessLevel.NONE, ), ] for rule in defaultRules: db.recordCreate(AccessRule, rule) logger.info(f"Created {len(defaultRules)} default role rules") def createTableSpecificRules(db: DatabaseConnector) -> None: """ Create table-specific rules converted from UAM logic. These rules override generic rules for specific tables. Args: db: Database connector instance """ tableRules = [] # Mandate table - Only sysadmin can access tableRules.append(AccessRule( roleLabel="sysadmin", context=AccessRuleContext.DATA, item="Mandate", view=True, read=AccessLevel.ALL, create=AccessLevel.ALL, update=AccessLevel.ALL, delete=AccessLevel.ALL, )) tableRules.append(AccessRule( roleLabel="admin", context=AccessRuleContext.DATA, item="Mandate", view=False, read=AccessLevel.NONE, create=AccessLevel.NONE, update=AccessLevel.NONE, delete=AccessLevel.NONE, )) tableRules.append(AccessRule( roleLabel="user", context=AccessRuleContext.DATA, item="Mandate", view=False, read=AccessLevel.NONE, create=AccessLevel.NONE, update=AccessLevel.NONE, delete=AccessLevel.NONE, )) tableRules.append(AccessRule( roleLabel="viewer", context=AccessRuleContext.DATA, item="Mandate", view=False, read=AccessLevel.NONE, create=AccessLevel.NONE, update=AccessLevel.NONE, delete=AccessLevel.NONE, )) # UserInDB table tableRules.append(AccessRule( roleLabel="sysadmin", context=AccessRuleContext.DATA, item="UserInDB", view=True, read=AccessLevel.ALL, create=AccessLevel.ALL, update=AccessLevel.ALL, delete=AccessLevel.ALL, )) tableRules.append(AccessRule( roleLabel="admin", context=AccessRuleContext.DATA, item="UserInDB", view=True, read=AccessLevel.GROUP, create=AccessLevel.GROUP, update=AccessLevel.GROUP, delete=AccessLevel.GROUP, )) tableRules.append(AccessRule( roleLabel="user", context=AccessRuleContext.DATA, item="UserInDB", view=True, read=AccessLevel.MY, create=AccessLevel.NONE, update=AccessLevel.MY, delete=AccessLevel.NONE, )) tableRules.append(AccessRule( roleLabel="viewer", context=AccessRuleContext.DATA, item="UserInDB", view=True, read=AccessLevel.MY, create=AccessLevel.NONE, update=AccessLevel.NONE, delete=AccessLevel.NONE, )) # UserConnection table tableRules.append(AccessRule( roleLabel="sysadmin", context=AccessRuleContext.DATA, item="UserConnection", view=True, read=AccessLevel.ALL, create=AccessLevel.ALL, update=AccessLevel.ALL, delete=AccessLevel.ALL, )) tableRules.append(AccessRule( roleLabel="admin", context=AccessRuleContext.DATA, item="UserConnection", view=True, read=AccessLevel.GROUP, create=AccessLevel.GROUP, update=AccessLevel.GROUP, delete=AccessLevel.GROUP, )) tableRules.append(AccessRule( roleLabel="user", context=AccessRuleContext.DATA, item="UserConnection", view=True, read=AccessLevel.MY, create=AccessLevel.MY, update=AccessLevel.MY, delete=AccessLevel.MY, )) tableRules.append(AccessRule( roleLabel="viewer", context=AccessRuleContext.DATA, item="UserConnection", view=True, read=AccessLevel.MY, create=AccessLevel.NONE, update=AccessLevel.NONE, delete=AccessLevel.NONE, )) # DataNeutraliserConfig table tableRules.append(AccessRule( roleLabel="sysadmin", context=AccessRuleContext.DATA, item="DataNeutraliserConfig", view=True, read=AccessLevel.ALL, create=AccessLevel.ALL, update=AccessLevel.ALL, delete=AccessLevel.ALL, )) tableRules.append(AccessRule( roleLabel="admin", context=AccessRuleContext.DATA, item="DataNeutraliserConfig", view=True, read=AccessLevel.GROUP, create=AccessLevel.GROUP, update=AccessLevel.GROUP, delete=AccessLevel.GROUP, )) tableRules.append(AccessRule( roleLabel="user", context=AccessRuleContext.DATA, item="DataNeutraliserConfig", view=True, read=AccessLevel.MY, create=AccessLevel.MY, update=AccessLevel.MY, delete=AccessLevel.MY, )) tableRules.append(AccessRule( roleLabel="viewer", context=AccessRuleContext.DATA, item="DataNeutraliserConfig", view=True, read=AccessLevel.MY, create=AccessLevel.NONE, update=AccessLevel.NONE, delete=AccessLevel.NONE, )) # DataNeutralizerAttributes table tableRules.append(AccessRule( roleLabel="sysadmin", context=AccessRuleContext.DATA, item="DataNeutralizerAttributes", view=True, read=AccessLevel.ALL, create=AccessLevel.ALL, update=AccessLevel.ALL, delete=AccessLevel.ALL, )) tableRules.append(AccessRule( roleLabel="admin", context=AccessRuleContext.DATA, item="DataNeutralizerAttributes", view=True, read=AccessLevel.GROUP, create=AccessLevel.GROUP, update=AccessLevel.GROUP, delete=AccessLevel.GROUP, )) tableRules.append(AccessRule( roleLabel="user", context=AccessRuleContext.DATA, item="DataNeutralizerAttributes", view=True, read=AccessLevel.MY, create=AccessLevel.MY, update=AccessLevel.MY, delete=AccessLevel.MY, )) tableRules.append(AccessRule( roleLabel="viewer", context=AccessRuleContext.DATA, item="DataNeutralizerAttributes", view=True, read=AccessLevel.MY, create=AccessLevel.NONE, update=AccessLevel.NONE, delete=AccessLevel.NONE, )) # AuthEvent table tableRules.append(AccessRule( roleLabel="sysadmin", context=AccessRuleContext.DATA, item="AuthEvent", view=True, read=AccessLevel.ALL, create=AccessLevel.NONE, update=AccessLevel.NONE, delete=AccessLevel.ALL, )) tableRules.append(AccessRule( roleLabel="admin", context=AccessRuleContext.DATA, item="AuthEvent", view=True, read=AccessLevel.ALL, create=AccessLevel.NONE, update=AccessLevel.NONE, delete=AccessLevel.ALL, )) tableRules.append(AccessRule( roleLabel="user", context=AccessRuleContext.DATA, item="AuthEvent", view=True, read=AccessLevel.MY, create=AccessLevel.NONE, update=AccessLevel.NONE, delete=AccessLevel.NONE, )) tableRules.append(AccessRule( roleLabel="viewer", context=AccessRuleContext.DATA, item="AuthEvent", view=True, read=AccessLevel.MY, create=AccessLevel.NONE, update=AccessLevel.NONE, delete=AccessLevel.NONE, )) # Create all table-specific rules for rule in tableRules: db.recordCreate(AccessRule, rule) logger.info(f"Created {len(tableRules)} table-specific rules") def assignInitialUserRoles(db: DatabaseConnector, adminUserId: str, eventUserId: str) -> None: """ Assign initial roles to admin and event users. Args: db: Database connector instance adminUserId: Admin user ID eventUserId: Event user ID """ # Update admin user with sysadmin role adminUser = db.getRecordset(UserInDB, recordFilter={"id": adminUserId}) if adminUser: adminUserData = adminUser[0] if "sysadmin" not in adminUserData.get("roleLabels", []): adminUserData["roleLabels"] = adminUserData.get("roleLabels", []) + ["sysadmin"] db.recordUpdate(UserInDB, adminUserId, adminUserData) logger.info(f"Assigned sysadmin role to admin user {adminUserId}") # Update event user with sysadmin role eventUser = db.getRecordset(UserInDB, recordFilter={"id": eventUserId}) if eventUser: eventUserData = eventUser[0] if "sysadmin" not in eventUserData.get("roleLabels", []): eventUserData["roleLabels"] = eventUserData.get("roleLabels", []) + ["sysadmin"] db.recordUpdate(UserInDB, eventUserId, eventUserData) logger.info(f"Assigned sysadmin role to event user {eventUserId}") def _getPasswordHash(password: Optional[str]) -> Optional[str]: """ Hash a password using Argon2. Args: password: Plain text password Returns: Hashed password or None if password is None """ if password is None: return None return pwdContext.hash(password)