gateway/modules/interfaces/interfaceBootstrap.py
2026-01-26 12:39:00 +01:00

1044 lines
36 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Centralized bootstrap interface for system initialization.
Contains all bootstrap logic including mandate, users, and RBAC rules.
Multi-Tenant Design:
- Rollen werden mit Kontext erstellt (mandateId=None für globale Template-Rollen)
- AccessRules referenzieren roleId (FK), nicht roleLabel
- Admin-User bekommt isSysAdmin=True statt roleLabels
"""
import logging
from typing import Optional, Dict
from passlib.context import CryptContext
from modules.connectors.connectorDbPostgre import DatabaseConnector
from modules.shared.configuration import APP_CONFIG
from modules.datamodels.datamodelUam import (
Mandate,
UserInDB,
AuthAuthority,
)
from modules.datamodels.datamodelRbac import (
AccessRule,
AccessRuleContext,
Role,
)
from modules.datamodels.datamodelUam import AccessLevel
from modules.datamodels.datamodelMembership import (
UserMandate,
UserMandateRole,
)
logger = logging.getLogger(__name__)
# Password-Hashing
pwdContext = CryptContext(schemes=["argon2"], deprecated="auto")
# Cache für Role-IDs (roleLabel -> roleId)
_roleIdCache: Dict[str, str] = {}
def initBootstrap(db: DatabaseConnector) -> None:
"""
Main bootstrap entry point - initializes all system components.
Args:
db: Database connector instance
"""
logger.info("Starting system bootstrap")
# Initialize root mandate
mandateId = initRootMandate(db)
# Initialize roles FIRST (needed for AccessRules)
initRoles(db)
# Initialize RBAC rules (uses roleIds from roles)
initRbacRules(db)
# Initialize admin user
adminUserId = initAdminUser(db, mandateId)
# Initialize event user
eventUserId = initEventUser(db, mandateId)
# Assign initial user memberships (via UserMandate + UserMandateRole)
if adminUserId and eventUserId and mandateId:
assignInitialUserMemberships(db, mandateId, adminUserId, eventUserId)
# Apply multi-tenant database optimizations (indexes, triggers, FKs)
_applyDatabaseOptimizations(db)
logger.info("System bootstrap completed")
def initRootMandate(db: DatabaseConnector) -> Optional[str]:
"""
Creates the Root mandate if it doesn't exist.
Args:
db: Database connector instance
Returns:
Mandate ID if created or found, None otherwise
"""
existingMandates = db.getRecordset(Mandate)
if existingMandates:
mandateId = existingMandates[0].get("id")
logger.info(f"Root mandate already exists with ID {mandateId}")
return mandateId
logger.info("Creating Root mandate")
rootMandate = Mandate(name="Root", enabled=True)
createdMandate = db.recordCreate(Mandate, rootMandate)
mandateId = createdMandate.get("id")
logger.info(f"Root mandate created with ID {mandateId}")
return mandateId
def initAdminUser(db: DatabaseConnector, mandateId: Optional[str]) -> Optional[str]:
"""
Creates the Admin user if it doesn't exist.
Admin user gets isSysAdmin=True for system-level access.
Role assignment is done via UserMandate + UserMandateRole in assignInitialUserMemberships().
Args:
db: Database connector instance
mandateId: Root mandate ID (for membership assignment, not on User)
Returns:
User ID if created or found, None otherwise
"""
existingUsers = db.getRecordset(UserInDB, recordFilter={"username": "admin"})
if existingUsers:
userId = existingUsers[0].get("id")
existingIsSysAdmin = existingUsers[0].get("isSysAdmin", False)
# Ensure admin user has isSysAdmin=True
if not existingIsSysAdmin:
logger.info(f"Updating admin user {userId} to set isSysAdmin=True")
db.recordModify(UserInDB, userId, {"isSysAdmin": True})
logger.info(f"Admin user already exists with ID {userId}")
return userId
logger.info("Creating Admin user")
adminUser = UserInDB(
username="admin",
email="admin@example.com",
fullName="Administrator",
enabled=True,
language="en",
isSysAdmin=True,
authenticationAuthority=AuthAuthority.LOCAL,
hashedPassword=_getPasswordHash(APP_CONFIG.get("APP_INIT_PASS_ADMIN_SECRET")),
)
createdUser = db.recordCreate(UserInDB, adminUser)
userId = createdUser.get("id")
logger.info(f"Admin user created with ID {userId}")
return userId
def initEventUser(db: DatabaseConnector, mandateId: Optional[str]) -> Optional[str]:
"""
Creates the Event user if it doesn't exist.
Event user gets isSysAdmin=True for system operations.
Role assignment is done via UserMandate + UserMandateRole in assignInitialUserMemberships().
Args:
db: Database connector instance
mandateId: Root mandate ID (for membership assignment, not on User)
Returns:
User ID if created or found, None otherwise
"""
existingUsers = db.getRecordset(UserInDB, recordFilter={"username": "event"})
if existingUsers:
userId = existingUsers[0].get("id")
logger.info(f"Event user already exists with ID {userId}")
return userId
logger.info("Creating Event user")
eventUser = UserInDB(
username="event",
email="event@example.com",
fullName="Event",
enabled=True,
language="en",
isSysAdmin=True,
authenticationAuthority=AuthAuthority.LOCAL,
hashedPassword=_getPasswordHash(APP_CONFIG.get("APP_INIT_PASS_EVENT_SECRET")),
)
createdUser = db.recordCreate(UserInDB, eventUser)
userId = createdUser.get("id")
logger.info(f"Event user created with ID {userId}")
return userId
def initRoles(db: DatabaseConnector) -> None:
"""
Initialize standard roles if they don't exist.
Roles are created as GLOBAL (mandateId=None) template roles.
NOTE: SysAdmin is NOT a role - it's a flag (User.isSysAdmin).
SysAdmin users bypass RBAC entirely and have full system access.
These template roles are for mandate/feature-level access control.
Args:
db: Database connector instance
"""
logger.info("Initializing roles")
global _roleIdCache
_roleIdCache = {}
# Standard template roles for mandate/feature-level access
# NOTE: No "sysadmin" role - SysAdmin is a flag (User.isSysAdmin), not a role!
standardRoles = [
Role(
roleLabel="admin",
description={"en": "Administrator - Manage users and resources within mandate scope", "de": "Administrator - Benutzer und Ressourcen im Mandanten verwalten", "fr": "Administrateur - Gérer les utilisateurs et ressources dans le périmètre du mandat"},
mandateId=None, # Global template role
featureInstanceId=None,
featureCode=None,
isSystemRole=True
),
Role(
roleLabel="user",
description={"en": "User - Standard user with access to own records", "de": "Benutzer - Standard-Benutzer mit Zugriff auf eigene Datensätze", "fr": "Utilisateur - Utilisateur standard avec accès à ses propres enregistrements"},
mandateId=None, # Global template role
featureInstanceId=None,
featureCode=None,
isSystemRole=True
),
Role(
roleLabel="viewer",
description={"en": "Viewer - Read-only access to group records", "de": "Betrachter - Nur-Lese-Zugriff auf Gruppen-Datensätze", "fr": "Visualiseur - Accès en lecture seule aux enregistrements du groupe"},
mandateId=None, # Global template role
featureInstanceId=None,
featureCode=None,
isSystemRole=True
),
]
existingRoles = db.getRecordset(Role)
existingRoleLabels = {role.get("roleLabel"): role.get("id") for role in existingRoles}
for role in standardRoles:
if role.roleLabel not in existingRoleLabels:
try:
createdRole = db.recordCreate(Role, role)
_roleIdCache[role.roleLabel] = createdRole.get("id")
logger.info(f"Created role: {role.roleLabel} with ID {createdRole.get('id')}")
except Exception as e:
logger.warning(f"Error creating role {role.roleLabel}: {e}")
else:
_roleIdCache[role.roleLabel] = existingRoleLabels[role.roleLabel]
logger.debug(f"Role {role.roleLabel} already exists with ID {existingRoleLabels[role.roleLabel]}")
logger.info("Roles initialization completed")
def _getRoleId(db: DatabaseConnector, roleLabel: str) -> Optional[str]:
"""
Get role ID by label, using cache or database lookup.
Args:
db: Database connector
roleLabel: Role label to look up
Returns:
Role ID or None if not found
"""
global _roleIdCache
if roleLabel in _roleIdCache:
return _roleIdCache[roleLabel]
# Lookup from database
roles = db.getRecordset(Role, recordFilter={"roleLabel": roleLabel})
if roles:
roleId = roles[0].get("id")
_roleIdCache[roleLabel] = roleId
return roleId
logger.warning(f"Role not found: {roleLabel}")
return None
def initRbacRules(db: DatabaseConnector) -> None:
"""
Initialize RBAC rules if they don't exist.
AccessRules now reference roleId (FK) instead of roleLabel.
Args:
db: Database connector instance
"""
existingRules = db.getRecordset(AccessRule)
if existingRules:
logger.info(f"RBAC rules already exist ({len(existingRules)} rules)")
# Still ensure UI and DATA rules exist (may have been added later)
_ensureUiContextRules(db)
_ensureDataContextRules(db)
return
logger.info("Initializing RBAC rules")
# Create default role rules
_createDefaultRoleRules(db)
# Create table-specific rules (converted from UAM logic)
_createTableSpecificRules(db)
# Create UI context rules
_createUiContextRules(db)
# Create RESOURCE context rules
_createResourceContextRules(db)
logger.info("RBAC rules initialization completed")
def _createDefaultRoleRules(db: DatabaseConnector) -> None:
"""
Create default role rules for generic access (item = null).
Uses roleId instead of roleLabel.
NOTE: No rules for "sysadmin" - SysAdmin is a flag (User.isSysAdmin), not a role!
SysAdmin users bypass RBAC entirely via the isSysAdmin check in getRecordsetWithRBAC().
Args:
db: Database connector instance
"""
defaultRules = []
# Admin Role - Group-level access (highest role-based permission)
adminId = _getRoleId(db, "admin")
if adminId:
defaultRules.append(AccessRule(
roleId=adminId,
context=AccessRuleContext.DATA,
item=None,
view=True,
read=AccessLevel.GROUP,
create=AccessLevel.GROUP,
update=AccessLevel.GROUP,
delete=AccessLevel.NONE,
))
# User Role - My records only
userId = _getRoleId(db, "user")
if userId:
defaultRules.append(AccessRule(
roleId=userId,
context=AccessRuleContext.DATA,
item=None,
view=True,
read=AccessLevel.MY,
create=AccessLevel.MY,
update=AccessLevel.MY,
delete=AccessLevel.MY,
))
# Viewer Role - Read-only group access
viewerId = _getRoleId(db, "viewer")
if viewerId:
defaultRules.append(AccessRule(
roleId=viewerId,
context=AccessRuleContext.DATA,
item=None,
view=True,
read=AccessLevel.GROUP,
create=AccessLevel.NONE,
update=AccessLevel.NONE,
delete=AccessLevel.NONE,
))
for rule in defaultRules:
db.recordCreate(AccessRule, rule)
logger.info(f"Created {len(defaultRules)} default role rules")
def _createTableSpecificRules(db: DatabaseConnector) -> None:
"""
Create table-specific rules converted from UAM logic.
These rules override generic rules for specific tables.
Uses roleId instead of roleLabel.
NOTE: No rules for "sysadmin" - SysAdmin is a flag (User.isSysAdmin), not a role!
SysAdmin users bypass RBAC entirely via the isSysAdmin check in getRecordsetWithRBAC().
Args:
db: Database connector instance
"""
tableRules = []
# Get role IDs (no sysadmin - that's a flag, not a role!)
adminId = _getRoleId(db, "admin")
userId = _getRoleId(db, "user")
viewerId = _getRoleId(db, "viewer")
# ==========================================================================
# DATA TABLE RULES - Using semantic namespace structure
# ==========================================================================
# Namespace structure:
# - data.uam.* → User Access Management (mandantenübergreifend)
# - data.chat.* → Chat/AI-Daten (benutzer-eigen, kein Mandantenkontext)
# - data.files.* → Dateien (benutzer-eigen)
# - data.automation.* → Automation (benutzer-eigen)
# - data.feature.* → Mandanten-/Feature-spezifische Daten (dynamisch)
#
# GROUP-Berechtigung:
# - data.uam.*: GROUP filtert nach Mandant (via UserMandate)
# - data.chat.*, data.files.*, data.automation.*: GROUP = MY (benutzer-eigen)
# ==========================================================================
# -------------------------------------------------------------------------
# UAM Namespace - User Access Management
# -------------------------------------------------------------------------
# Mandate table - Only SysAdmin (flag) can access, not roles
# Regular roles have no access to Mandate table
if adminId:
tableRules.append(AccessRule(
roleId=adminId,
context=AccessRuleContext.DATA,
item="data.uam.Mandate",
view=False,
read=AccessLevel.NONE,
create=AccessLevel.NONE,
update=AccessLevel.NONE,
delete=AccessLevel.NONE,
))
if userId:
tableRules.append(AccessRule(
roleId=userId,
context=AccessRuleContext.DATA,
item="data.uam.Mandate",
view=False,
read=AccessLevel.NONE,
create=AccessLevel.NONE,
update=AccessLevel.NONE,
delete=AccessLevel.NONE,
))
if viewerId:
tableRules.append(AccessRule(
roleId=viewerId,
context=AccessRuleContext.DATA,
item="data.uam.Mandate",
view=False,
read=AccessLevel.NONE,
create=AccessLevel.NONE,
update=AccessLevel.NONE,
delete=AccessLevel.NONE,
))
# UserInDB table - Admin can manage users within group scope
if adminId:
tableRules.append(AccessRule(
roleId=adminId,
context=AccessRuleContext.DATA,
item="data.uam.UserInDB",
view=True,
read=AccessLevel.GROUP,
create=AccessLevel.GROUP,
update=AccessLevel.GROUP,
delete=AccessLevel.GROUP,
))
if userId:
tableRules.append(AccessRule(
roleId=userId,
context=AccessRuleContext.DATA,
item="data.uam.UserInDB",
view=True,
read=AccessLevel.MY,
create=AccessLevel.NONE,
update=AccessLevel.MY,
delete=AccessLevel.NONE,
))
if viewerId:
tableRules.append(AccessRule(
roleId=viewerId,
context=AccessRuleContext.DATA,
item="data.uam.UserInDB",
view=True,
read=AccessLevel.MY,
create=AccessLevel.NONE,
update=AccessLevel.NONE,
delete=AccessLevel.NONE,
))
# UserConnection: All users only MY-level CRUD (UAM namespace)
for roleId in [adminId, userId]:
if roleId:
tableRules.append(AccessRule(
roleId=roleId,
context=AccessRuleContext.DATA,
item="data.uam.UserConnection",
view=True,
read=AccessLevel.MY,
create=AccessLevel.MY,
update=AccessLevel.MY,
delete=AccessLevel.MY,
))
if viewerId:
tableRules.append(AccessRule(
roleId=viewerId,
context=AccessRuleContext.DATA,
item="data.uam.UserConnection",
view=True,
read=AccessLevel.MY,
create=AccessLevel.NONE,
update=AccessLevel.NONE,
delete=AccessLevel.NONE,
))
# Invitation: Standard group-level access (UAM namespace)
if adminId:
tableRules.append(AccessRule(
roleId=adminId,
context=AccessRuleContext.DATA,
item="data.uam.Invitation",
view=True,
read=AccessLevel.GROUP,
create=AccessLevel.GROUP,
update=AccessLevel.GROUP,
delete=AccessLevel.GROUP,
))
if userId:
tableRules.append(AccessRule(
roleId=userId,
context=AccessRuleContext.DATA,
item="data.uam.Invitation",
view=True,
read=AccessLevel.MY,
create=AccessLevel.MY,
update=AccessLevel.MY,
delete=AccessLevel.MY,
))
if viewerId:
tableRules.append(AccessRule(
roleId=viewerId,
context=AccessRuleContext.DATA,
item="data.uam.Invitation",
view=True,
read=AccessLevel.MY,
create=AccessLevel.NONE,
update=AccessLevel.NONE,
delete=AccessLevel.NONE,
))
# AuthEvent table - Audit logs (UAM namespace, no delete for audit integrity!)
if adminId:
tableRules.append(AccessRule(
roleId=adminId,
context=AccessRuleContext.DATA,
item="data.uam.AuthEvent",
view=True,
read=AccessLevel.ALL,
create=AccessLevel.NONE,
update=AccessLevel.NONE,
delete=AccessLevel.NONE,
))
if userId:
tableRules.append(AccessRule(
roleId=userId,
context=AccessRuleContext.DATA,
item="data.uam.AuthEvent",
view=True,
read=AccessLevel.MY,
create=AccessLevel.NONE,
update=AccessLevel.NONE,
delete=AccessLevel.NONE,
))
if viewerId:
tableRules.append(AccessRule(
roleId=viewerId,
context=AccessRuleContext.DATA,
item="data.uam.AuthEvent",
view=True,
read=AccessLevel.MY,
create=AccessLevel.NONE,
update=AccessLevel.NONE,
delete=AccessLevel.NONE,
))
# -------------------------------------------------------------------------
# Chat Namespace - User-owned, no mandate context
# -------------------------------------------------------------------------
# Prompt: Only MY-level access (user-owned, no mandate context)
# Each user manages only their own prompts
for roleId in [adminId, userId]:
if roleId:
tableRules.append(AccessRule(
roleId=roleId,
context=AccessRuleContext.DATA,
item="data.chat.Prompt",
view=True,
read=AccessLevel.MY,
create=AccessLevel.MY,
update=AccessLevel.MY,
delete=AccessLevel.MY,
))
if viewerId:
tableRules.append(AccessRule(
roleId=viewerId,
context=AccessRuleContext.DATA,
item="data.chat.Prompt",
view=True,
read=AccessLevel.MY,
create=AccessLevel.NONE,
update=AccessLevel.NONE,
delete=AccessLevel.NONE,
))
# ChatWorkflow: Only MY-level access (user-owned, no mandate context)
for roleId in [adminId, userId]:
if roleId:
tableRules.append(AccessRule(
roleId=roleId,
context=AccessRuleContext.DATA,
item="data.chat.ChatWorkflow",
view=True,
read=AccessLevel.MY,
create=AccessLevel.MY,
update=AccessLevel.MY,
delete=AccessLevel.MY,
))
if viewerId:
tableRules.append(AccessRule(
roleId=viewerId,
context=AccessRuleContext.DATA,
item="data.chat.ChatWorkflow",
view=True,
read=AccessLevel.MY,
create=AccessLevel.NONE,
update=AccessLevel.NONE,
delete=AccessLevel.NONE,
))
# -------------------------------------------------------------------------
# Files Namespace - User-owned, no mandate context
# -------------------------------------------------------------------------
# FileItem: Only MY-level access (user-owned)
for roleId in [adminId, userId]:
if roleId:
tableRules.append(AccessRule(
roleId=roleId,
context=AccessRuleContext.DATA,
item="data.files.FileItem",
view=True,
read=AccessLevel.MY,
create=AccessLevel.MY,
update=AccessLevel.MY,
delete=AccessLevel.MY,
))
if viewerId:
tableRules.append(AccessRule(
roleId=viewerId,
context=AccessRuleContext.DATA,
item="data.files.FileItem",
view=True,
read=AccessLevel.MY,
create=AccessLevel.NONE,
update=AccessLevel.NONE,
delete=AccessLevel.NONE,
))
# -------------------------------------------------------------------------
# Automation Namespace - User-owned, no mandate context
# -------------------------------------------------------------------------
# AutomationDefinition: Only MY-level access (user-owned)
for roleId in [adminId, userId]:
if roleId:
tableRules.append(AccessRule(
roleId=roleId,
context=AccessRuleContext.DATA,
item="data.automation.AutomationDefinition",
view=True,
read=AccessLevel.MY,
create=AccessLevel.MY,
update=AccessLevel.MY,
delete=AccessLevel.MY,
))
if viewerId:
tableRules.append(AccessRule(
roleId=viewerId,
context=AccessRuleContext.DATA,
item="data.automation.AutomationDefinition",
view=True,
read=AccessLevel.MY,
create=AccessLevel.NONE,
update=AccessLevel.NONE,
delete=AccessLevel.NONE,
))
# Create all table-specific rules
for rule in tableRules:
db.recordCreate(AccessRule, rule)
logger.info(f"Created {len(tableRules)} table-specific rules")
def _createUiContextRules(db: DatabaseConnector) -> None:
"""
Create UI context rules for controlling UI element visibility.
Uses roleId instead of roleLabel.
Creates rules for system pages based on NAVIGATION_SECTIONS.
Admin pages require admin role, public pages are available to all.
NOTE: No sysadmin rules - SysAdmin users bypass RBAC via isSysAdmin flag.
Args:
db: Database connector instance
"""
from modules.system.mainSystem import NAVIGATION_SECTIONS
uiRules = []
adminId = _getRoleId(db, "admin")
userId = _getRoleId(db, "user")
viewerId = _getRoleId(db, "viewer")
# Create rules based on navigation sections
for section in NAVIGATION_SECTIONS:
isAdminSection = section.get("adminOnly", False)
for item in section.get("items", []):
objectKey = item.get("objectKey")
isPublic = item.get("public", False)
isAdminOnly = item.get("adminOnly", False) or isAdminSection
if isAdminOnly:
# Admin-only pages: only admin role
if adminId:
uiRules.append(AccessRule(
roleId=adminId,
context=AccessRuleContext.UI,
item=objectKey,
view=True,
read=None, create=None, update=None, delete=None,
))
else:
# Public/normal pages: all roles
for roleId in [adminId, userId, viewerId]:
if roleId:
uiRules.append(AccessRule(
roleId=roleId,
context=AccessRuleContext.UI,
item=objectKey,
view=True,
read=None, create=None, update=None, delete=None,
))
for rule in uiRules:
db.recordCreate(AccessRule, rule)
logger.info(f"Created {len(uiRules)} UI context rules")
def _ensureUiContextRules(db: DatabaseConnector) -> None:
"""
Ensure UI context rules exist for all navigation items.
This is called during bootstrap to add missing UI rules for new navigation items.
Args:
db: Database connector instance
"""
from modules.system.mainSystem import NAVIGATION_SECTIONS
adminId = _getRoleId(db, "admin")
userId = _getRoleId(db, "user")
viewerId = _getRoleId(db, "viewer")
# Get existing UI rules
existingUiRules = db.getRecordset(
AccessRule,
recordFilter={"context": AccessRuleContext.UI.value}
)
# Build set of existing (roleId, item) combinations
existingCombinations = set()
for rule in existingUiRules:
roleId = rule.get("roleId")
item = rule.get("item")
if roleId and item:
existingCombinations.add((roleId, item))
# Check each navigation item and add missing rules
missingRules = []
for section in NAVIGATION_SECTIONS:
isAdminSection = section.get("adminOnly", False)
for item in section.get("items", []):
objectKey = item.get("objectKey")
if not objectKey:
continue
isAdminOnly = item.get("adminOnly", False) or isAdminSection
if isAdminOnly:
# Admin-only: only admin role
if adminId and (adminId, objectKey) not in existingCombinations:
missingRules.append(AccessRule(
roleId=adminId,
context=AccessRuleContext.UI,
item=objectKey,
view=True,
read=None, create=None, update=None, delete=None,
))
else:
# Public/normal: all roles
for roleId in [adminId, userId, viewerId]:
if roleId and (roleId, objectKey) not in existingCombinations:
missingRules.append(AccessRule(
roleId=roleId,
context=AccessRuleContext.UI,
item=objectKey,
view=True,
read=None, create=None, update=None, delete=None,
))
# Create missing rules
if missingRules:
for rule in missingRules:
db.recordCreate(AccessRule, rule)
logger.info(f"Created {len(missingRules)} missing UI context rules")
else:
logger.debug("All UI context rules already exist")
def _ensureDataContextRules(db: DatabaseConnector) -> None:
"""
Ensure DATA context rules exist for key tables like ChatWorkflow and AutomationDefinition.
This is called during bootstrap to add missing DATA rules for new tables.
Args:
db: Database connector instance
"""
adminId = _getRoleId(db, "admin")
userId = _getRoleId(db, "user")
viewerId = _getRoleId(db, "viewer")
# Get existing DATA rules
existingDataRules = db.getRecordset(
AccessRule,
recordFilter={"context": AccessRuleContext.DATA.value}
)
# Build set of existing (roleId, item) combinations
existingCombinations = set()
for rule in existingDataRules:
roleId = rule.get("roleId")
item = rule.get("item")
if roleId and item:
existingCombinations.add((roleId, item))
# Define tables that need rules (user-owned, no mandate context)
# Users can only manage their own records (MY-level access)
tablesNeedingRules = [
"data.chat.ChatWorkflow",
"data.automation.AutomationDefinition",
]
missingRules = []
for objectKey in tablesNeedingRules:
# Admin: MY-level access (user-owned, no mandate context)
if adminId and (adminId, objectKey) not in existingCombinations:
missingRules.append(AccessRule(
roleId=adminId,
context=AccessRuleContext.DATA,
item=objectKey,
view=True,
read=AccessLevel.MY,
create=AccessLevel.MY,
update=AccessLevel.MY,
delete=AccessLevel.MY,
))
# User: MY-level access (user-owned, no mandate context)
if userId and (userId, objectKey) not in existingCombinations:
missingRules.append(AccessRule(
roleId=userId,
context=AccessRuleContext.DATA,
item=objectKey,
view=True,
read=AccessLevel.MY,
create=AccessLevel.MY,
update=AccessLevel.MY,
delete=AccessLevel.MY,
))
# Viewer: MY read-only (user-owned, no mandate context)
if viewerId and (viewerId, objectKey) not in existingCombinations:
missingRules.append(AccessRule(
roleId=viewerId,
context=AccessRuleContext.DATA,
item=objectKey,
view=True,
read=AccessLevel.MY,
create=AccessLevel.NONE,
update=AccessLevel.NONE,
delete=AccessLevel.NONE,
))
# Create missing rules
if missingRules:
for rule in missingRules:
db.recordCreate(AccessRule, rule)
logger.info(f"Created {len(missingRules)} missing DATA context rules")
else:
logger.debug("All DATA context rules already exist")
def _createResourceContextRules(db: DatabaseConnector) -> None:
"""
Create RESOURCE context rules for controlling resource access.
Uses roleId instead of roleLabel.
NOTE: No sysadmin rules - SysAdmin users bypass RBAC via isSysAdmin flag.
Args:
db: Database connector instance
"""
resourceRules = []
# All roles get full resource access by default (no sysadmin - that's a flag)
for roleLabel in ["admin", "user", "viewer"]:
roleId = _getRoleId(db, roleLabel)
if roleId:
resourceRules.append(AccessRule(
roleId=roleId,
context=AccessRuleContext.RESOURCE,
item=None,
view=True,
read=None,
create=None,
update=None,
delete=None,
))
for rule in resourceRules:
db.recordCreate(AccessRule, rule)
logger.info(f"Created {len(resourceRules)} RESOURCE context rules")
def assignInitialUserMemberships(
db: DatabaseConnector,
mandateId: str,
adminUserId: str,
eventUserId: str
) -> None:
"""
Assign initial memberships to admin and event users via UserMandate + UserMandateRole.
This is the NEW multi-tenant way of assigning roles.
NOTE: SysAdmin is a flag (User.isSysAdmin), not a role. Initial users get the "admin" role
within the root mandate, plus they have isSysAdmin=True for system-level access.
Args:
db: Database connector instance
mandateId: Root mandate ID
adminUserId: Admin user ID
eventUserId: Event user ID
"""
# Use "admin" role for mandate membership (SysAdmin is a flag, not a role!)
adminRoleId = _getRoleId(db, "admin")
if not adminRoleId:
logger.warning("Admin role not found, skipping membership assignment")
return
for userId, userName in [(adminUserId, "admin"), (eventUserId, "event")]:
# Check if UserMandate already exists
existingMemberships = db.getRecordset(
UserMandate,
recordFilter={"userId": userId, "mandateId": mandateId}
)
if existingMemberships:
userMandateId = existingMemberships[0].get("id")
logger.debug(f"UserMandate already exists for {userName} user")
else:
# Create UserMandate
userMandate = UserMandate(
userId=userId,
mandateId=mandateId,
enabled=True
)
createdMembership = db.recordCreate(UserMandate, userMandate)
userMandateId = createdMembership.get("id")
logger.info(f"Created UserMandate for {userName} user with ID {userMandateId}")
# Check if UserMandateRole already exists
existingRoles = db.getRecordset(
UserMandateRole,
recordFilter={"userMandateId": userMandateId, "roleId": adminRoleId}
)
if not existingRoles:
# Create UserMandateRole with "admin" role
userMandateRole = UserMandateRole(
userMandateId=userMandateId,
roleId=adminRoleId
)
db.recordCreate(UserMandateRole, userMandateRole)
logger.info(f"Assigned admin role to {userName} user in mandate")
def _getPasswordHash(password: Optional[str]) -> Optional[str]:
"""
Hash a password using Argon2.
Args:
password: Plain text password
Returns:
Hashed password or None if password is None
"""
if password is None:
return None
return pwdContext.hash(password)
def _applyDatabaseOptimizations(db: DatabaseConnector) -> None:
"""
Apply multi-tenant database optimizations after bootstrap.
Creates indexes, immutable triggers, and foreign key constraints
for the multi-tenant junction tables. All operations are idempotent.
Args:
db: Database connector instance
"""
try:
from modules.shared.dbMultiTenantOptimizations import applyMultiTenantOptimizations
result = applyMultiTenantOptimizations(db)
if result.get("errors"):
for error in result["errors"]:
logger.warning(f"DB optimization error: {error}")
else:
totalCreated = (
result.get("indexesCreated", 0) +
result.get("triggersCreated", 0) +
result.get("foreignKeysCreated", 0)
)
if totalCreated > 0:
logger.info(
f"Applied DB optimizations: {result['indexesCreated']} indexes, "
f"{result['triggersCreated']} triggers, "
f"{result['foreignKeysCreated']} foreign keys"
)
# If nothing created, optimizations were already applied (idempotent)
except ImportError as e:
logger.warning(f"DB optimizations module not available: {e}")
except Exception as e:
# Don't fail bootstrap if optimizations fail
logger.warning(f"Failed to apply DB optimizations (non-critical): {e}")