gateway/modules/interfaces/interfaceBootstrap.py
2026-02-10 00:10:07 +01:00

1847 lines
66 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Centralized bootstrap interface for system initialization.
Contains all bootstrap logic including mandate, users, and RBAC rules.
Multi-Tenant Design:
- Rollen werden mit Kontext erstellt (mandateId=None für globale Template-Rollen)
- AccessRules referenzieren roleId (FK), nicht roleLabel
- Admin-User bekommt isSysAdmin=True statt roleLabels
"""
import logging
from typing import Optional, Dict
from passlib.context import CryptContext
from modules.connectors.connectorDbPostgre import DatabaseConnector
from modules.shared.configuration import APP_CONFIG
from modules.datamodels.datamodelUam import (
Mandate,
UserInDB,
AuthAuthority,
)
from modules.datamodels.datamodelRbac import (
AccessRule,
AccessRuleContext,
Role,
)
from modules.datamodels.datamodelUam import AccessLevel
from modules.datamodels.datamodelMembership import (
UserMandate,
UserMandateRole,
)
logger = logging.getLogger(__name__)
# Password-Hashing
pwdContext = CryptContext(schemes=["argon2"], deprecated="auto")
# Cache für Role-IDs (roleLabel -> roleId)
_roleIdCache: Dict[str, str] = {}
def initBootstrap(db: DatabaseConnector) -> None:
"""
Main bootstrap entry point - initializes all system components.
Args:
db: Database connector instance
"""
logger.info("Starting system bootstrap")
# Initialize root mandate
mandateId = initRootMandate(db)
# Migrate existing mandate records: description -> label
_migrateMandateDescriptionToLabel(db)
# Initialize system role TEMPLATES (mandateId=None, isSystemRole=True)
initRoles(db)
# Initialize RBAC rules for template roles
initRbacRules(db)
# Copy system template roles to ALL mandates as mandate-instance roles
# This also serves as migration for existing mandates that don't have instance roles yet
_ensureAllMandatesHaveSystemRoles(db)
# Initialize admin user
adminUserId = initAdminUser(db, mandateId)
# Initialize event user
eventUserId = initEventUser(db, mandateId)
# Assign initial user memberships (via UserMandate + UserMandateRole)
# Uses mandate-instance roles (not template roles)
if adminUserId and eventUserId and mandateId:
assignInitialUserMemberships(db, mandateId, adminUserId, eventUserId)
# Apply multi-tenant database optimizations (indexes, triggers, FKs)
_applyDatabaseOptimizations(db)
# Seed automation templates (after admin user exists)
initAutomationTemplates(db, adminUserId)
# Initialize feature instances for root mandate
if mandateId:
initRootMandateFeatures(db, mandateId)
# Initialize billing settings for root mandate
if mandateId:
initRootMandateBilling(mandateId)
def initAutomationTemplates(dbApp: DatabaseConnector, adminUserId: Optional[str] = None) -> None:
"""
Seed initial automation templates from subAutomationTemplates.py.
Only runs if no templates exist yet (bootstrap).
Creates templates with _createdBy = admin user (SysAdmin privilege).
NOTE: AutomationTemplate lives in poweron_automation database, not poweron_app!
Args:
dbApp: Database connector for poweron_app (used to get admin user if needed)
adminUserId: Admin user ID for _createdBy field
"""
import json
from modules.features.automation.subAutomationTemplates import AUTOMATION_TEMPLATES
from modules.features.automation.datamodelFeatureAutomation import AutomationTemplate
from modules.shared.configuration import APP_CONFIG
# Create connector for poweron_automation database (where templates live)
dbHost = APP_CONFIG.get("DB_HOST", "_no_config_default_data")
dbDatabase = "poweron_automation"
dbUser = APP_CONFIG.get("DB_USER")
dbPassword = APP_CONFIG.get("DB_PASSWORD_SECRET")
dbPort = int(APP_CONFIG.get("DB_PORT", 5432))
dbAutomation = DatabaseConnector(
dbHost=dbHost,
dbDatabase=dbDatabase,
dbUser=dbUser,
dbPassword=dbPassword,
dbPort=dbPort,
userId=adminUserId,
)
dbAutomation.initDbSystem()
# Check if templates already exist in poweron_automation
existing = dbAutomation.getRecordset(AutomationTemplate)
if existing:
logger.info(f"Automation templates already seeded ({len(existing)} templates)")
return
# Get admin user ID if not provided (from poweron_app)
if not adminUserId:
adminUsers = dbApp.getRecordset(UserInDB, recordFilter={"email": APP_CONFIG.ADMIN_EMAIL})
adminUserId = adminUsers[0]["id"] if adminUsers else None
# Update context with admin user
if adminUserId:
dbAutomation.updateContext(adminUserId)
templates = AUTOMATION_TEMPLATES.get("sets", [])
createdCount = 0
for i, templateSet in enumerate(templates):
templateContent = templateSet.get("template", {})
overview = templateContent.get("overview", f"Template {i+1}")
# Create multilingual label from overview (use as German since current templates are German)
# English is required by TextMultilingual, so we use the same value
labelDict = {"en": overview, "ge": overview}
overviewDict = {"en": overview, "ge": overview}
# Create template WITHOUT parameters (no sharp values)
templateData = {
"label": labelDict,
"overview": overviewDict,
"template": json.dumps(templateContent), # Store entire template JSON
}
try:
dbAutomation.recordCreate(AutomationTemplate, templateData)
createdCount += 1
logger.debug(f"Created automation template: {overview}")
except Exception as e:
logger.error(f"Failed to create automation template '{overview}': {e}")
logger.info(f"Seeded {createdCount} automation templates in poweron_automation database")
logger.info("System bootstrap completed")
def initRootMandateFeatures(db: DatabaseConnector, mandateId: str) -> None:
"""
Create feature instances for root mandate.
Dynamically discovers all feature modules with autoCreateInstance=True.
Args:
db: Database connector instance
mandateId: Root mandate ID
"""
from modules.datamodels.datamodelFeatures import FeatureInstance
from modules.interfaces.interfaceFeatures import getFeatureInterface
from modules.system.registry import loadFeatureMainModules
logger.info("Initializing root mandate features")
# Dynamically discover features with autoCreateInstance=True
featuresToCreate = []
mainModules = loadFeatureMainModules()
for featureName, module in mainModules.items():
if hasattr(module, "getFeatureDefinition"):
try:
featureDef = module.getFeatureDefinition()
if featureDef.get("autoCreateInstance", False):
featureCode = featureDef.get("code", featureName)
featureLabel = featureDef.get("label", {}).get("en", featureName)
featuresToCreate.append({"code": featureCode, "label": featureLabel})
logger.debug(f"Feature '{featureCode}' marked for auto-creation in root mandate")
except Exception as e:
logger.warning(f"Could not read feature definition for '{featureName}': {e}")
if not featuresToCreate:
logger.info("No features marked for auto-creation in root mandate")
return
featureInterface = getFeatureInterface(db)
for featureConfig in featuresToCreate:
featureCode = featureConfig["code"]
featureLabel = featureConfig["label"]
try:
# Check if instance already exists
existingInstances = db.getRecordset(
FeatureInstance,
recordFilter={
"mandateId": mandateId,
"featureCode": featureCode
}
)
if existingInstances:
logger.info(f"Feature instance for '{featureCode}' already exists in root mandate")
continue
# Create feature instance with template roles copied
instance = featureInterface.createFeatureInstance(
featureCode=featureCode,
mandateId=mandateId,
label=featureLabel,
enabled=True,
copyTemplateRoles=True
)
if instance:
instanceId = instance.get("id") if isinstance(instance, dict) else instance.id
logger.info(f"Created feature instance '{instanceId}' for '{featureCode}' in root mandate")
else:
logger.warning(f"Failed to create feature instance for '{featureCode}'")
except Exception as e:
logger.error(f"Error creating feature instance for '{featureCode}': {e}")
logger.info("Root mandate features initialization completed")
def initRootMandate(db: DatabaseConnector) -> Optional[str]:
"""
Creates the Root mandate if it doesn't exist.
Root mandate is identified by name='root' AND isSystem=True.
Args:
db: Database connector instance
Returns:
Mandate ID if created or found, None otherwise
"""
# Find existing root mandate by name AND isSystem flag
existingMandates = db.getRecordset(Mandate, recordFilter={"name": "root", "isSystem": True})
if existingMandates:
mandateId = existingMandates[0].get("id")
logger.info(f"Root mandate already exists with ID {mandateId}")
return mandateId
# Check for legacy root mandates (name="Root" without isSystem flag) and migrate
legacyMandates = db.getRecordset(Mandate, recordFilter={"name": "Root"})
if legacyMandates:
mandateId = legacyMandates[0].get("id")
logger.info(f"Migrating legacy Root mandate {mandateId}: setting name='root', isSystem=True")
db.recordModify(Mandate, mandateId, {"name": "root", "isSystem": True})
return mandateId
logger.info("Creating Root mandate")
rootMandate = Mandate(name="root", isSystem=True, enabled=True)
createdMandate = db.recordCreate(Mandate, rootMandate)
mandateId = createdMandate.get("id")
logger.info(f"Root mandate created with ID {mandateId}")
return mandateId
def _migrateMandateDescriptionToLabel(db: DatabaseConnector) -> None:
"""
Migration: Rename 'description' field to 'label' in all Mandate records.
Copies existing 'description' values to 'label' and removes the old field.
Safe to run multiple times (idempotent).
"""
allMandates = db.getRecordset(Mandate)
migratedCount = 0
for mandateRecord in allMandates:
mandateId = mandateRecord.get("id")
hasDescription = "description" in mandateRecord and mandateRecord.get("description") is not None
hasLabel = "label" in mandateRecord and mandateRecord.get("label") is not None
if hasDescription and not hasLabel:
# Copy description to label
updateData = {"label": mandateRecord["description"]}
db.recordModify(Mandate, mandateId, updateData)
migratedCount += 1
logger.info(f"Migrated mandate {mandateId}: description -> label")
if migratedCount > 0:
logger.info(f"Migrated {migratedCount} mandate(s) from description to label")
else:
logger.debug("No mandate description->label migration needed")
def initAdminUser(db: DatabaseConnector, mandateId: Optional[str]) -> Optional[str]:
"""
Creates the Admin user if it doesn't exist.
Admin user gets isSysAdmin=True for system-level access.
Role assignment is done via UserMandate + UserMandateRole in assignInitialUserMemberships().
Args:
db: Database connector instance
mandateId: Root mandate ID (for membership assignment, not on User)
Returns:
User ID if created or found, None otherwise
"""
existingUsers = db.getRecordset(UserInDB, recordFilter={"username": "admin"})
if existingUsers:
userId = existingUsers[0].get("id")
existingIsSysAdmin = existingUsers[0].get("isSysAdmin", False)
# Ensure admin user has isSysAdmin=True
if not existingIsSysAdmin:
logger.info(f"Updating admin user {userId} to set isSysAdmin=True")
db.recordModify(UserInDB, userId, {"isSysAdmin": True})
logger.info(f"Admin user already exists with ID {userId}")
return userId
logger.info("Creating Admin user")
adminUser = UserInDB(
username="admin",
email="admin@example.com",
fullName="Administrator",
enabled=True,
language="en",
isSysAdmin=True,
authenticationAuthority=AuthAuthority.LOCAL,
hashedPassword=_getPasswordHash(APP_CONFIG.get("APP_INIT_PASS_ADMIN_SECRET")),
)
createdUser = db.recordCreate(UserInDB, adminUser)
userId = createdUser.get("id")
logger.info(f"Admin user created with ID {userId}")
return userId
def initEventUser(db: DatabaseConnector, mandateId: Optional[str]) -> Optional[str]:
"""
Creates the Event user if it doesn't exist.
Event user gets isSysAdmin=True for system operations.
Role assignment is done via UserMandate + UserMandateRole in assignInitialUserMemberships().
Args:
db: Database connector instance
mandateId: Root mandate ID (for membership assignment, not on User)
Returns:
User ID if created or found, None otherwise
"""
existingUsers = db.getRecordset(UserInDB, recordFilter={"username": "event"})
if existingUsers:
userId = existingUsers[0].get("id")
logger.info(f"Event user already exists with ID {userId}")
return userId
logger.info("Creating Event user")
eventUser = UserInDB(
username="event",
email="event@example.com",
fullName="Event",
enabled=True,
language="en",
isSysAdmin=True,
authenticationAuthority=AuthAuthority.LOCAL,
hashedPassword=_getPasswordHash(APP_CONFIG.get("APP_INIT_PASS_EVENT_SECRET")),
)
createdUser = db.recordCreate(UserInDB, eventUser)
userId = createdUser.get("id")
logger.info(f"Event user created with ID {userId}")
return userId
def initRoles(db: DatabaseConnector) -> None:
"""
Initialize standard roles if they don't exist.
Roles are created as GLOBAL (mandateId=None) template roles.
NOTE: SysAdmin is NOT a role - it's a flag (User.isSysAdmin).
SysAdmin users bypass RBAC entirely and have full system access.
These template roles are for mandate/feature-level access control.
Args:
db: Database connector instance
"""
logger.info("Initializing roles")
global _roleIdCache
_roleIdCache = {}
# Standard template roles for mandate/feature-level access
# NOTE: No "sysadmin" role - SysAdmin is a flag (User.isSysAdmin), not a role!
standardRoles = [
Role(
roleLabel="admin",
description={"en": "Administrator - Manage users and resources within mandate scope", "de": "Administrator - Benutzer und Ressourcen im Mandanten verwalten", "fr": "Administrateur - Gérer les utilisateurs et ressources dans le périmètre du mandat"},
mandateId=None, # Global template role
featureInstanceId=None,
featureCode=None,
isSystemRole=True
),
Role(
roleLabel="user",
description={"en": "User - Standard user with access to own records", "de": "Benutzer - Standard-Benutzer mit Zugriff auf eigene Datensätze", "fr": "Utilisateur - Utilisateur standard avec accès à ses propres enregistrements"},
mandateId=None, # Global template role
featureInstanceId=None,
featureCode=None,
isSystemRole=True
),
Role(
roleLabel="viewer",
description={"en": "Viewer - Read-only access to group records", "de": "Betrachter - Nur-Lese-Zugriff auf Gruppen-Datensätze", "fr": "Visualiseur - Accès en lecture seule aux enregistrements du groupe"},
mandateId=None, # Global template role
featureInstanceId=None,
featureCode=None,
isSystemRole=True
),
]
existingRoles = db.getRecordset(Role)
existingRoleLabels = {role.get("roleLabel"): role.get("id") for role in existingRoles}
for role in standardRoles:
if role.roleLabel not in existingRoleLabels:
try:
createdRole = db.recordCreate(Role, role)
_roleIdCache[role.roleLabel] = createdRole.get("id")
logger.info(f"Created role: {role.roleLabel} with ID {createdRole.get('id')}")
except Exception as e:
logger.warning(f"Error creating role {role.roleLabel}: {e}")
else:
_roleIdCache[role.roleLabel] = existingRoleLabels[role.roleLabel]
logger.info("Roles initialization completed")
def _ensureAllMandatesHaveSystemRoles(db: DatabaseConnector) -> None:
"""
Ensure all existing mandates have system-instance roles.
Serves as both initial setup and migration for existing mandates.
"""
allMandates = db.getRecordset(Mandate)
if not allMandates:
return
for mandate in allMandates:
mandateId = mandate.get("id")
copySystemRolesToMandate(db, mandateId)
def copySystemRolesToMandate(db: DatabaseConnector, mandateId: str) -> int:
"""
Copy system template roles (mandateId=None, isSystemRole=True) to a mandate
as mandate-instance roles. Also copies all AccessRules for each role.
This is analogous to how feature template roles are copied to feature instances.
Each mandate gets its own instances of admin/user/viewer with their AccessRules.
Args:
db: Database connector instance
mandateId: Target mandate ID
Returns:
Number of roles copied
"""
import uuid as _uuid
# Find system template roles (global, no mandateId)
templateRoles = db.getRecordset(
Role,
recordFilter={"isSystemRole": True, "mandateId": None}
)
if not templateRoles:
logger.debug("No system template roles found to copy")
return 0
# Check which roles already exist for this mandate
existingMandateRoles = db.getRecordset(
Role,
recordFilter={"mandateId": mandateId, "featureInstanceId": None}
)
existingLabels = {r.get("roleLabel") for r in existingMandateRoles}
# Load all AccessRules for template roles
templateRoleIds = [r.get("id") for r in templateRoles]
rulesByRoleId = {}
for roleId in templateRoleIds:
rules = db.getRecordset(AccessRule, recordFilter={"roleId": roleId})
rulesByRoleId[roleId] = rules
copiedCount = 0
for templateRole in templateRoles:
roleLabel = templateRole.get("roleLabel")
# Skip if mandate already has this role
if roleLabel in existingLabels:
logger.debug(f"Mandate {mandateId} already has role '{roleLabel}', skipping")
continue
newRoleId = str(_uuid.uuid4())
# Create mandate-instance role
newRole = Role(
id=newRoleId,
roleLabel=roleLabel,
description=templateRole.get("description", {}),
mandateId=mandateId,
featureInstanceId=None,
featureCode=None,
isSystemRole=True # Still a system role, but bound to this mandate
)
db.recordCreate(Role, newRole.model_dump())
# Copy AccessRules
templateRules = rulesByRoleId.get(templateRole.get("id"), [])
for rule in templateRules:
newRule = AccessRule(
id=str(_uuid.uuid4()),
roleId=newRoleId,
context=rule.get("context"),
item=rule.get("item"),
view=rule.get("view", False),
read=rule.get("read"),
create=rule.get("create"),
update=rule.get("update"),
delete=rule.get("delete")
)
db.recordCreate(AccessRule, newRule.model_dump())
copiedCount += 1
logger.info(f"Copied system role '{roleLabel}' to mandate {mandateId} with {len(templateRules)} AccessRules")
if copiedCount > 0:
logger.info(f"Copied {copiedCount} system roles to mandate {mandateId}")
return copiedCount
def _getRoleId(db: DatabaseConnector, roleLabel: str) -> Optional[str]:
"""
Get role ID by label, using cache or database lookup.
Args:
db: Database connector
roleLabel: Role label to look up
Returns:
Role ID or None if not found
"""
global _roleIdCache
if roleLabel in _roleIdCache:
return _roleIdCache[roleLabel]
# Lookup from database
roles = db.getRecordset(Role, recordFilter={"roleLabel": roleLabel})
if roles:
roleId = roles[0].get("id")
_roleIdCache[roleLabel] = roleId
return roleId
logger.warning(f"Role not found: {roleLabel}")
return None
def initRbacRules(db: DatabaseConnector) -> None:
"""
Initialize RBAC rules if they don't exist.
AccessRules now reference roleId (FK) instead of roleLabel.
Args:
db: Database connector instance
"""
existingRules = db.getRecordset(AccessRule)
if existingRules:
logger.info(f"RBAC rules already exist ({len(existingRules)} rules)")
# Still ensure UI and DATA rules exist (may have been added later)
_ensureUiContextRules(db)
_ensureDataContextRules(db)
return
logger.info("Initializing RBAC rules")
# Create default role rules
_createDefaultRoleRules(db)
# Create table-specific rules (converted from UAM logic)
_createTableSpecificRules(db)
# Create UI context rules
_createUiContextRules(db)
# Create RESOURCE context rules
_createResourceContextRules(db)
logger.info("RBAC rules initialization completed")
def _createDefaultRoleRules(db: DatabaseConnector) -> None:
"""
Create default role rules for generic access (item = null).
Uses roleId instead of roleLabel.
NOTE: No rules for "sysadmin" - SysAdmin is a flag (User.isSysAdmin), not a role!
SysAdmin users bypass RBAC entirely via the isSysAdmin check in getRecordsetWithRBAC().
Args:
db: Database connector instance
"""
defaultRules = []
# Admin Role - Group-level access (highest role-based permission)
adminId = _getRoleId(db, "admin")
if adminId:
defaultRules.append(AccessRule(
roleId=adminId,
context=AccessRuleContext.DATA,
item=None,
view=True,
read=AccessLevel.GROUP,
create=AccessLevel.GROUP,
update=AccessLevel.GROUP,
delete=AccessLevel.NONE,
))
# User Role - My records only
userId = _getRoleId(db, "user")
if userId:
defaultRules.append(AccessRule(
roleId=userId,
context=AccessRuleContext.DATA,
item=None,
view=True,
read=AccessLevel.MY,
create=AccessLevel.MY,
update=AccessLevel.MY,
delete=AccessLevel.MY,
))
# Viewer Role - Read-only group access
viewerId = _getRoleId(db, "viewer")
if viewerId:
defaultRules.append(AccessRule(
roleId=viewerId,
context=AccessRuleContext.DATA,
item=None,
view=True,
read=AccessLevel.GROUP,
create=AccessLevel.NONE,
update=AccessLevel.NONE,
delete=AccessLevel.NONE,
))
for rule in defaultRules:
db.recordCreate(AccessRule, rule)
logger.info(f"Created {len(defaultRules)} default role rules")
def _createTableSpecificRules(db: DatabaseConnector) -> None:
"""
Create table-specific rules converted from UAM logic.
These rules override generic rules for specific tables.
Uses roleId instead of roleLabel.
NOTE: No rules for "sysadmin" - SysAdmin is a flag (User.isSysAdmin), not a role!
SysAdmin users bypass RBAC entirely via the isSysAdmin check in getRecordsetWithRBAC().
Args:
db: Database connector instance
"""
tableRules = []
# Get role IDs (no sysadmin - that's a flag, not a role!)
adminId = _getRoleId(db, "admin")
userId = _getRoleId(db, "user")
viewerId = _getRoleId(db, "viewer")
# ==========================================================================
# DATA TABLE RULES - Using semantic namespace structure
# ==========================================================================
# Namespace structure:
# - data.uam.* → User Access Management (mandantenübergreifend)
# - data.chat.* → Chat/AI-Daten (benutzer-eigen, kein Mandantenkontext)
# - data.files.* → Dateien (benutzer-eigen)
# - data.automation.* → Automation (benutzer-eigen)
# - data.feature.* → Mandanten-/Feature-spezifische Daten (dynamisch)
#
# GROUP-Berechtigung:
# - data.uam.*: GROUP filtert nach Mandant (via UserMandate)
# - data.chat.*, data.files.*, data.automation.*: GROUP = MY (benutzer-eigen)
# ==========================================================================
# -------------------------------------------------------------------------
# UAM Namespace - User Access Management
# -------------------------------------------------------------------------
# Mandate table - Only SysAdmin (flag) can access, not roles
# Regular roles have no access to Mandate table
if adminId:
tableRules.append(AccessRule(
roleId=adminId,
context=AccessRuleContext.DATA,
item="data.uam.Mandate",
view=False,
read=AccessLevel.NONE,
create=AccessLevel.NONE,
update=AccessLevel.NONE,
delete=AccessLevel.NONE,
))
if userId:
tableRules.append(AccessRule(
roleId=userId,
context=AccessRuleContext.DATA,
item="data.uam.Mandate",
view=False,
read=AccessLevel.NONE,
create=AccessLevel.NONE,
update=AccessLevel.NONE,
delete=AccessLevel.NONE,
))
if viewerId:
tableRules.append(AccessRule(
roleId=viewerId,
context=AccessRuleContext.DATA,
item="data.uam.Mandate",
view=False,
read=AccessLevel.NONE,
create=AccessLevel.NONE,
update=AccessLevel.NONE,
delete=AccessLevel.NONE,
))
# UserInDB table - Admin can manage users within group scope
if adminId:
tableRules.append(AccessRule(
roleId=adminId,
context=AccessRuleContext.DATA,
item="data.uam.UserInDB",
view=True,
read=AccessLevel.GROUP,
create=AccessLevel.GROUP,
update=AccessLevel.GROUP,
delete=AccessLevel.GROUP,
))
if userId:
tableRules.append(AccessRule(
roleId=userId,
context=AccessRuleContext.DATA,
item="data.uam.UserInDB",
view=True,
read=AccessLevel.MY,
create=AccessLevel.NONE,
update=AccessLevel.MY,
delete=AccessLevel.NONE,
))
if viewerId:
tableRules.append(AccessRule(
roleId=viewerId,
context=AccessRuleContext.DATA,
item="data.uam.UserInDB",
view=True,
read=AccessLevel.MY,
create=AccessLevel.NONE,
update=AccessLevel.NONE,
delete=AccessLevel.NONE,
))
# UserConnection: All users only MY-level CRUD (UAM namespace)
for roleId in [adminId, userId]:
if roleId:
tableRules.append(AccessRule(
roleId=roleId,
context=AccessRuleContext.DATA,
item="data.uam.UserConnection",
view=True,
read=AccessLevel.MY,
create=AccessLevel.MY,
update=AccessLevel.MY,
delete=AccessLevel.MY,
))
if viewerId:
tableRules.append(AccessRule(
roleId=viewerId,
context=AccessRuleContext.DATA,
item="data.uam.UserConnection",
view=True,
read=AccessLevel.MY,
create=AccessLevel.NONE,
update=AccessLevel.NONE,
delete=AccessLevel.NONE,
))
# Invitation: Standard group-level access (UAM namespace)
if adminId:
tableRules.append(AccessRule(
roleId=adminId,
context=AccessRuleContext.DATA,
item="data.uam.Invitation",
view=True,
read=AccessLevel.GROUP,
create=AccessLevel.GROUP,
update=AccessLevel.GROUP,
delete=AccessLevel.GROUP,
))
if userId:
tableRules.append(AccessRule(
roleId=userId,
context=AccessRuleContext.DATA,
item="data.uam.Invitation",
view=True,
read=AccessLevel.MY,
create=AccessLevel.MY,
update=AccessLevel.MY,
delete=AccessLevel.MY,
))
if viewerId:
tableRules.append(AccessRule(
roleId=viewerId,
context=AccessRuleContext.DATA,
item="data.uam.Invitation",
view=True,
read=AccessLevel.MY,
create=AccessLevel.NONE,
update=AccessLevel.NONE,
delete=AccessLevel.NONE,
))
# AuthEvent table - Audit logs (UAM namespace, no delete for audit integrity!)
if adminId:
tableRules.append(AccessRule(
roleId=adminId,
context=AccessRuleContext.DATA,
item="data.uam.AuthEvent",
view=True,
read=AccessLevel.ALL,
create=AccessLevel.NONE,
update=AccessLevel.NONE,
delete=AccessLevel.NONE,
))
if userId:
tableRules.append(AccessRule(
roleId=userId,
context=AccessRuleContext.DATA,
item="data.uam.AuthEvent",
view=True,
read=AccessLevel.MY,
create=AccessLevel.NONE,
update=AccessLevel.NONE,
delete=AccessLevel.NONE,
))
if viewerId:
tableRules.append(AccessRule(
roleId=viewerId,
context=AccessRuleContext.DATA,
item="data.uam.AuthEvent",
view=True,
read=AccessLevel.MY,
create=AccessLevel.NONE,
update=AccessLevel.NONE,
delete=AccessLevel.NONE,
))
# -------------------------------------------------------------------------
# Chat Namespace - User-owned, no mandate context
# -------------------------------------------------------------------------
# Prompt: Only MY-level access (user-owned, no mandate context)
# Each user manages only their own prompts
for roleId in [adminId, userId]:
if roleId:
tableRules.append(AccessRule(
roleId=roleId,
context=AccessRuleContext.DATA,
item="data.chat.Prompt",
view=True,
read=AccessLevel.MY,
create=AccessLevel.MY,
update=AccessLevel.MY,
delete=AccessLevel.MY,
))
if viewerId:
tableRules.append(AccessRule(
roleId=viewerId,
context=AccessRuleContext.DATA,
item="data.chat.Prompt",
view=True,
read=AccessLevel.MY,
create=AccessLevel.NONE,
update=AccessLevel.NONE,
delete=AccessLevel.NONE,
))
# ChatWorkflow: Only MY-level access (user-owned, no mandate context)
for roleId in [adminId, userId]:
if roleId:
tableRules.append(AccessRule(
roleId=roleId,
context=AccessRuleContext.DATA,
item="data.chat.ChatWorkflow",
view=True,
read=AccessLevel.MY,
create=AccessLevel.MY,
update=AccessLevel.MY,
delete=AccessLevel.MY,
))
if viewerId:
tableRules.append(AccessRule(
roleId=viewerId,
context=AccessRuleContext.DATA,
item="data.chat.ChatWorkflow",
view=True,
read=AccessLevel.MY,
create=AccessLevel.NONE,
update=AccessLevel.NONE,
delete=AccessLevel.NONE,
))
# -------------------------------------------------------------------------
# Files Namespace - User-owned, no mandate context
# -------------------------------------------------------------------------
# FileItem: Only MY-level access (user-owned)
for roleId in [adminId, userId]:
if roleId:
tableRules.append(AccessRule(
roleId=roleId,
context=AccessRuleContext.DATA,
item="data.files.FileItem",
view=True,
read=AccessLevel.MY,
create=AccessLevel.MY,
update=AccessLevel.MY,
delete=AccessLevel.MY,
))
if viewerId:
tableRules.append(AccessRule(
roleId=viewerId,
context=AccessRuleContext.DATA,
item="data.files.FileItem",
view=True,
read=AccessLevel.MY,
create=AccessLevel.NONE,
update=AccessLevel.NONE,
delete=AccessLevel.NONE,
))
# -------------------------------------------------------------------------
# Automation Namespace - User-owned, no mandate context
# -------------------------------------------------------------------------
# AutomationDefinition: Only MY-level access (user-owned)
for roleId in [adminId, userId]:
if roleId:
tableRules.append(AccessRule(
roleId=roleId,
context=AccessRuleContext.DATA,
item="data.automation.AutomationDefinition",
view=True,
read=AccessLevel.MY,
create=AccessLevel.MY,
update=AccessLevel.MY,
delete=AccessLevel.MY,
))
if viewerId:
tableRules.append(AccessRule(
roleId=viewerId,
context=AccessRuleContext.DATA,
item="data.automation.AutomationDefinition",
view=True,
read=AccessLevel.MY,
create=AccessLevel.NONE,
update=AccessLevel.NONE,
delete=AccessLevel.NONE,
))
# AutomationTemplate: Admin sees ALL (system templates), User sees only MY
if adminId:
tableRules.append(AccessRule(
roleId=adminId,
context=AccessRuleContext.DATA,
item="data.automation.AutomationTemplate",
view=True,
read=AccessLevel.ALL, # SysAdmin sees all templates
create=AccessLevel.ALL,
update=AccessLevel.ALL,
delete=AccessLevel.ALL,
))
if userId:
tableRules.append(AccessRule(
roleId=userId,
context=AccessRuleContext.DATA,
item="data.automation.AutomationTemplate",
view=True,
read=AccessLevel.MY,
create=AccessLevel.MY,
update=AccessLevel.MY,
delete=AccessLevel.MY,
))
if viewerId:
tableRules.append(AccessRule(
roleId=viewerId,
context=AccessRuleContext.DATA,
item="data.automation.AutomationTemplate",
view=True,
read=AccessLevel.ALL, # Viewer can see all templates (read-only)
create=AccessLevel.NONE,
update=AccessLevel.NONE,
delete=AccessLevel.NONE,
))
# -------------------------------------------------------------------------
# Billing Namespace - Billing accounts and transactions
# -------------------------------------------------------------------------
# BillingAccount: User sees own accounts (MY), Admin sees all in mandate (GROUP)
# Each user must see all billing accounts assigned to them
if adminId:
tableRules.append(AccessRule(
roleId=adminId,
context=AccessRuleContext.DATA,
item="data.billing.BillingAccount",
view=True,
read=AccessLevel.GROUP,
create=AccessLevel.NONE,
update=AccessLevel.NONE,
delete=AccessLevel.NONE,
))
if userId:
tableRules.append(AccessRule(
roleId=userId,
context=AccessRuleContext.DATA,
item="data.billing.BillingAccount",
view=True,
read=AccessLevel.MY,
create=AccessLevel.NONE,
update=AccessLevel.NONE,
delete=AccessLevel.NONE,
))
if viewerId:
tableRules.append(AccessRule(
roleId=viewerId,
context=AccessRuleContext.DATA,
item="data.billing.BillingAccount",
view=True,
read=AccessLevel.MY,
create=AccessLevel.NONE,
update=AccessLevel.NONE,
delete=AccessLevel.NONE,
))
# BillingTransaction: User sees own transactions (MY), Admin sees all in mandate (GROUP)
if adminId:
tableRules.append(AccessRule(
roleId=adminId,
context=AccessRuleContext.DATA,
item="data.billing.BillingTransaction",
view=True,
read=AccessLevel.GROUP,
create=AccessLevel.NONE,
update=AccessLevel.NONE,
delete=AccessLevel.NONE,
))
if userId:
tableRules.append(AccessRule(
roleId=userId,
context=AccessRuleContext.DATA,
item="data.billing.BillingTransaction",
view=True,
read=AccessLevel.MY,
create=AccessLevel.NONE,
update=AccessLevel.NONE,
delete=AccessLevel.NONE,
))
if viewerId:
tableRules.append(AccessRule(
roleId=viewerId,
context=AccessRuleContext.DATA,
item="data.billing.BillingTransaction",
view=True,
read=AccessLevel.MY,
create=AccessLevel.NONE,
update=AccessLevel.NONE,
delete=AccessLevel.NONE,
))
# BillingSettings: Only admin can view mandate settings (read-only)
# SysAdmin (flag) manages settings, roles only read
if adminId:
tableRules.append(AccessRule(
roleId=adminId,
context=AccessRuleContext.DATA,
item="data.billing.BillingSettings",
view=True,
read=AccessLevel.GROUP,
create=AccessLevel.NONE,
update=AccessLevel.NONE,
delete=AccessLevel.NONE,
))
if userId:
tableRules.append(AccessRule(
roleId=userId,
context=AccessRuleContext.DATA,
item="data.billing.BillingSettings",
view=False,
read=AccessLevel.NONE,
create=AccessLevel.NONE,
update=AccessLevel.NONE,
delete=AccessLevel.NONE,
))
if viewerId:
tableRules.append(AccessRule(
roleId=viewerId,
context=AccessRuleContext.DATA,
item="data.billing.BillingSettings",
view=False,
read=AccessLevel.NONE,
create=AccessLevel.NONE,
update=AccessLevel.NONE,
delete=AccessLevel.NONE,
))
# Create all table-specific rules
for rule in tableRules:
db.recordCreate(AccessRule, rule)
logger.info(f"Created {len(tableRules)} table-specific rules")
def _createUiContextRules(db: DatabaseConnector) -> None:
"""
Create UI context rules for controlling UI element visibility.
Uses roleId instead of roleLabel.
Creates rules for system pages based on NAVIGATION_SECTIONS.
Admin pages require admin role, public pages are available to all.
NOTE: No sysadmin rules - SysAdmin users bypass RBAC via isSysAdmin flag.
Args:
db: Database connector instance
"""
from modules.system.mainSystem import NAVIGATION_SECTIONS
uiRules = []
adminId = _getRoleId(db, "admin")
userId = _getRoleId(db, "user")
viewerId = _getRoleId(db, "viewer")
# Create rules based on navigation sections
for section in NAVIGATION_SECTIONS:
isAdminSection = section.get("adminOnly", False)
for item in section.get("items", []):
objectKey = item.get("objectKey")
isPublic = item.get("public", False)
isAdminOnly = item.get("adminOnly", False) or isAdminSection
if isAdminOnly:
# Admin-only pages: only admin role
if adminId:
uiRules.append(AccessRule(
roleId=adminId,
context=AccessRuleContext.UI,
item=objectKey,
view=True,
read=None, create=None, update=None, delete=None,
))
else:
# Public/normal pages: all roles
for roleId in [adminId, userId, viewerId]:
if roleId:
uiRules.append(AccessRule(
roleId=roleId,
context=AccessRuleContext.UI,
item=objectKey,
view=True,
read=None, create=None, update=None, delete=None,
))
for rule in uiRules:
db.recordCreate(AccessRule, rule)
logger.info(f"Created {len(uiRules)} UI context rules")
def _ensureUiContextRules(db: DatabaseConnector) -> None:
"""
Ensure UI context rules exist for all navigation items.
This is called during bootstrap to add missing UI rules for new navigation items.
Args:
db: Database connector instance
"""
from modules.system.mainSystem import NAVIGATION_SECTIONS
adminId = _getRoleId(db, "admin")
userId = _getRoleId(db, "user")
viewerId = _getRoleId(db, "viewer")
# Get existing UI rules
existingUiRules = db.getRecordset(
AccessRule,
recordFilter={"context": AccessRuleContext.UI.value}
)
# Build set of existing (roleId, item) combinations
existingCombinations = set()
for rule in existingUiRules:
roleId = rule.get("roleId")
item = rule.get("item")
if roleId and item:
existingCombinations.add((roleId, item))
# Check each navigation item and add missing rules
missingRules = []
for section in NAVIGATION_SECTIONS:
isAdminSection = section.get("adminOnly", False)
for item in section.get("items", []):
objectKey = item.get("objectKey")
if not objectKey:
continue
isAdminOnly = item.get("adminOnly", False) or isAdminSection
if isAdminOnly:
# Admin-only: only admin role
if adminId and (adminId, objectKey) not in existingCombinations:
missingRules.append(AccessRule(
roleId=adminId,
context=AccessRuleContext.UI,
item=objectKey,
view=True,
read=None, create=None, update=None, delete=None,
))
else:
# Public/normal: all roles
for roleId in [adminId, userId, viewerId]:
if roleId and (roleId, objectKey) not in existingCombinations:
missingRules.append(AccessRule(
roleId=roleId,
context=AccessRuleContext.UI,
item=objectKey,
view=True,
read=None, create=None, update=None, delete=None,
))
# Create missing rules
if missingRules:
for rule in missingRules:
db.recordCreate(AccessRule, rule)
logger.info(f"Created {len(missingRules)} missing UI context rules")
# All UI context rules already exist (nothing to create)
def _ensureDataContextRules(db: DatabaseConnector) -> None:
"""
Ensure DATA context rules exist for key tables like ChatWorkflow and AutomationDefinition.
This is called during bootstrap to add missing DATA rules for new tables.
Args:
db: Database connector instance
"""
adminId = _getRoleId(db, "admin")
userId = _getRoleId(db, "user")
viewerId = _getRoleId(db, "viewer")
# Get existing DATA rules
existingDataRules = db.getRecordset(
AccessRule,
recordFilter={"context": AccessRuleContext.DATA.value}
)
# Build set of existing (roleId, item) combinations
existingCombinations = set()
for rule in existingDataRules:
roleId = rule.get("roleId")
item = rule.get("item")
if roleId and item:
existingCombinations.add((roleId, item))
# Define tables that need rules (user-owned, no mandate context)
# Users can only manage their own records (MY-level access)
tablesNeedingMyRules = [
"data.chat.ChatWorkflow",
"data.automation.AutomationDefinition",
]
# Tables where admin sees ALL (system-wide templates)
tablesNeedingAllRulesForAdmin = [
"data.automation.AutomationTemplate",
]
# Billing tables: read-only for all roles, scoped by role level
# Users see their own accounts/transactions (MY), Admins see mandate-wide (GROUP)
billingReadOnlyTables = [
"data.billing.BillingAccount",
"data.billing.BillingTransaction",
]
missingRules = []
# MY-level rules for user-owned tables
for objectKey in tablesNeedingMyRules:
# Admin: MY-level access (user-owned, no mandate context)
if adminId and (adminId, objectKey) not in existingCombinations:
missingRules.append(AccessRule(
roleId=adminId,
context=AccessRuleContext.DATA,
item=objectKey,
view=True,
read=AccessLevel.MY,
create=AccessLevel.MY,
update=AccessLevel.MY,
delete=AccessLevel.MY,
))
# User: MY-level access (user-owned, no mandate context)
if userId and (userId, objectKey) not in existingCombinations:
missingRules.append(AccessRule(
roleId=userId,
context=AccessRuleContext.DATA,
item=objectKey,
view=True,
read=AccessLevel.MY,
create=AccessLevel.MY,
update=AccessLevel.MY,
delete=AccessLevel.MY,
))
# Viewer: MY read-only (user-owned, no mandate context)
if viewerId and (viewerId, objectKey) not in existingCombinations:
missingRules.append(AccessRule(
roleId=viewerId,
context=AccessRuleContext.DATA,
item=objectKey,
view=True,
read=AccessLevel.MY,
create=AccessLevel.NONE,
update=AccessLevel.NONE,
delete=AccessLevel.NONE,
))
# Admin rules for system templates (read ALL, write GROUP-scoped)
for objectKey in tablesNeedingAllRulesForAdmin:
# Admin: read ALL templates, create/update/delete within GROUP (mandate-scoped)
if adminId and (adminId, objectKey) not in existingCombinations:
missingRules.append(AccessRule(
roleId=adminId,
context=AccessRuleContext.DATA,
item=objectKey,
view=True,
read=AccessLevel.ALL,
create=AccessLevel.GROUP,
update=AccessLevel.GROUP,
delete=AccessLevel.GROUP,
))
# User: MY-level access
if userId and (userId, objectKey) not in existingCombinations:
missingRules.append(AccessRule(
roleId=userId,
context=AccessRuleContext.DATA,
item=objectKey,
view=True,
read=AccessLevel.MY,
create=AccessLevel.MY,
update=AccessLevel.MY,
delete=AccessLevel.MY,
))
# Viewer: ALL read-only (can see all templates)
if viewerId and (viewerId, objectKey) not in existingCombinations:
missingRules.append(AccessRule(
roleId=viewerId,
context=AccessRuleContext.DATA,
item=objectKey,
view=True,
read=AccessLevel.ALL,
create=AccessLevel.NONE,
update=AccessLevel.NONE,
delete=AccessLevel.NONE,
))
# Billing read-only rules: Admin=GROUP, User/Viewer=MY (own accounts/transactions)
for objectKey in billingReadOnlyTables:
# Admin: GROUP-level read (sees all accounts in their mandates)
if adminId and (adminId, objectKey) not in existingCombinations:
missingRules.append(AccessRule(
roleId=adminId,
context=AccessRuleContext.DATA,
item=objectKey,
view=True,
read=AccessLevel.GROUP,
create=AccessLevel.NONE,
update=AccessLevel.NONE,
delete=AccessLevel.NONE,
))
# User: MY-level read (sees only own billing accounts/transactions)
if userId and (userId, objectKey) not in existingCombinations:
missingRules.append(AccessRule(
roleId=userId,
context=AccessRuleContext.DATA,
item=objectKey,
view=True,
read=AccessLevel.MY,
create=AccessLevel.NONE,
update=AccessLevel.NONE,
delete=AccessLevel.NONE,
))
# Viewer: MY-level read-only (sees only own billing accounts/transactions)
if viewerId and (viewerId, objectKey) not in existingCombinations:
missingRules.append(AccessRule(
roleId=viewerId,
context=AccessRuleContext.DATA,
item=objectKey,
view=True,
read=AccessLevel.MY,
create=AccessLevel.NONE,
update=AccessLevel.NONE,
delete=AccessLevel.NONE,
))
# BillingSettings: Admin can view (GROUP), User/Viewer have no access
billingSettingsKey = "data.billing.BillingSettings"
if adminId and (adminId, billingSettingsKey) not in existingCombinations:
missingRules.append(AccessRule(
roleId=adminId,
context=AccessRuleContext.DATA,
item=billingSettingsKey,
view=True,
read=AccessLevel.GROUP,
create=AccessLevel.NONE,
update=AccessLevel.NONE,
delete=AccessLevel.NONE,
))
if userId and (userId, billingSettingsKey) not in existingCombinations:
missingRules.append(AccessRule(
roleId=userId,
context=AccessRuleContext.DATA,
item=billingSettingsKey,
view=False,
read=AccessLevel.NONE,
create=AccessLevel.NONE,
update=AccessLevel.NONE,
delete=AccessLevel.NONE,
))
if viewerId and (viewerId, billingSettingsKey) not in existingCombinations:
missingRules.append(AccessRule(
roleId=viewerId,
context=AccessRuleContext.DATA,
item=billingSettingsKey,
view=False,
read=AccessLevel.NONE,
create=AccessLevel.NONE,
update=AccessLevel.NONE,
delete=AccessLevel.NONE,
))
# Create missing rules
if missingRules:
for rule in missingRules:
db.recordCreate(AccessRule, rule)
logger.info(f"Created {len(missingRules)} missing DATA context rules")
# All DATA context rules already exist (nothing to create)
# Update existing AutomationTemplate rules for admin/viewer to ALL access
_updateAutomationTemplateRulesToAll(db, adminId, viewerId)
def _updateAutomationTemplateRulesToAll(db: DatabaseConnector, adminId: Optional[str], viewerId: Optional[str]) -> None:
"""
Update existing AutomationTemplate RBAC rules to correct levels.
- Admin: read=ALL, create/update/delete=GROUP (mandate-scoped writes)
- Viewer: read=ALL (read-only)
"""
if not adminId and not viewerId:
return
templateObjectKey = "data.automation.AutomationTemplate"
# Find existing rules for AutomationTemplate
existingRules = db.getRecordset(
AccessRule,
recordFilter={
"context": AccessRuleContext.DATA.value,
"item": templateObjectKey
}
)
updatedCount = 0
for rule in existingRules:
ruleId = rule.get("id")
roleId = rule.get("roleId")
currentReadLevel = rule.get("read")
if roleId == adminId:
# Admin: read ALL, write GROUP
updates = {}
if currentReadLevel != AccessLevel.ALL.value:
updates["read"] = AccessLevel.ALL.value
currentCreate = rule.get("create")
if currentCreate == AccessLevel.ALL.value:
updates["create"] = AccessLevel.GROUP.value
updates["update"] = AccessLevel.GROUP.value
updates["delete"] = AccessLevel.GROUP.value
if updates:
db.recordModify(AccessRule, ruleId, updates)
updatedCount += 1
logger.debug(f"Updated AutomationTemplate rule {ruleId} for admin to read=ALL, write=GROUP")
elif roleId == viewerId and currentReadLevel == AccessLevel.MY.value:
# Viewer: read ALL (read-only)
db.recordModify(AccessRule, ruleId, {"read": AccessLevel.ALL.value})
updatedCount += 1
logger.debug(f"Updated AutomationTemplate rule {ruleId} for viewer to read=ALL")
if updatedCount > 0:
logger.info(f"Updated {updatedCount} AutomationTemplate RBAC rules")
def _createResourceContextRules(db: DatabaseConnector) -> None:
"""
Create RESOURCE context rules for controlling resource access.
Uses roleId instead of roleLabel.
NOTE: No sysadmin rules - SysAdmin users bypass RBAC via isSysAdmin flag.
Args:
db: Database connector instance
"""
resourceRules = []
# Admin and User get default resource access; Viewer gets NO resource access
for roleLabel in ["admin", "user"]:
roleId = _getRoleId(db, roleLabel)
if roleId:
resourceRules.append(AccessRule(
roleId=roleId,
context=AccessRuleContext.RESOURCE,
item=None,
view=True,
read=None,
create=None,
update=None,
delete=None,
))
# Viewer: no default RESOURCE access (viewer cannot use system resources)
for rule in resourceRules:
db.recordCreate(AccessRule, rule)
logger.info(f"Created {len(resourceRules)} RESOURCE context rules")
# Create AICore provider RBAC rules
_createAicoreProviderRules(db)
def _createAicoreProviderRules(db: DatabaseConnector) -> None:
"""
Create RBAC rules for AICore providers (resource.aicore.{provider}).
Provider access per role:
- admin: all providers allowed
- user: all providers EXCEPT anthropic (view=False)
- viewer: NO provider access (viewer has no RESOURCE permissions)
NOTE: Provider list is dynamically discovered from AICore model registry.
Args:
db: Database connector instance
"""
try:
from modules.aicore.aicoreModelRegistry import modelRegistry
# Discover available connectors dynamically
connectors = modelRegistry.discoverConnectors()
providers = [c.getConnectorType() for c in connectors]
if not providers:
logger.warning("No AICore providers discovered, skipping provider RBAC rules")
return
logger.info(f"Creating RBAC rules for AICore providers: {providers}")
providerRules = []
# Admin: access to ALL providers
adminId = _getRoleId(db, "admin")
if adminId:
for provider in providers:
resourceKey = f"resource.aicore.{provider}"
existingRules = db.getRecordset(
AccessRule,
recordFilter={
"roleId": adminId,
"context": AccessRuleContext.RESOURCE.value,
"item": resourceKey
}
)
if not existingRules:
providerRules.append(AccessRule(
roleId=adminId,
context=AccessRuleContext.RESOURCE,
item=resourceKey,
view=True,
read=None, create=None, update=None, delete=None,
))
# User: access to all providers EXCEPT anthropic
userId = _getRoleId(db, "user")
if userId:
for provider in providers:
resourceKey = f"resource.aicore.{provider}"
existingRules = db.getRecordset(
AccessRule,
recordFilter={
"roleId": userId,
"context": AccessRuleContext.RESOURCE.value,
"item": resourceKey
}
)
if not existingRules:
# Anthropic is not allowed for user role
isAllowed = provider != "anthropic"
providerRules.append(AccessRule(
roleId=userId,
context=AccessRuleContext.RESOURCE,
item=resourceKey,
view=isAllowed,
read=None, create=None, update=None, delete=None,
))
# Viewer: NO provider access (viewer has no RESOURCE permissions at all)
for rule in providerRules:
db.recordCreate(AccessRule, rule)
if providerRules:
logger.info(f"Created {len(providerRules)} AICore provider RBAC rules")
else:
logger.debug("All AICore provider RBAC rules already exist")
except Exception as e:
logger.warning(f"Failed to create AICore provider RBAC rules: {e}")
def initRootMandateBilling(mandateId: str) -> None:
"""
Initialize billing settings for root mandate.
Root mandate uses PREPAY_USER model with 10 CHF initial credit per user.
Creates billing accounts for ALL users regardless of billing model (for audit trail).
Args:
mandateId: Root mandate ID
"""
try:
from modules.interfaces.interfaceDbBilling import _getRootInterface
from modules.interfaces.interfaceDbApp import getRootInterface as getAppRootInterface
from modules.datamodels.datamodelBilling import BillingSettings, BillingModelEnum
billingInterface = _getRootInterface()
appInterface = getAppRootInterface()
# Check if settings already exist
existingSettings = billingInterface.getSettings(mandateId)
if existingSettings:
logger.info("Billing settings for root mandate already exist")
else:
settings = BillingSettings(
mandateId=mandateId,
billingModel=BillingModelEnum.PREPAY_USER,
defaultUserCredit=10.0,
warningThresholdPercent=10.0,
blockOnZeroBalance=True,
notifyOnWarning=True
)
billingInterface.createSettings(settings)
logger.info(f"Created billing settings for root mandate: PREPAY_USER with 10 CHF default credit")
existingSettings = billingInterface.getSettings(mandateId)
# Always create user accounts for all users (audit trail)
if existingSettings:
billingModel = existingSettings.get("billingModel", "UNLIMITED")
if billingModel == BillingModelEnum.UNLIMITED.value:
return # No accounts needed for UNLIMITED
# Initial balance depends on billing model
if billingModel == BillingModelEnum.PREPAY_USER.value:
initialBalance = existingSettings.get("defaultUserCredit", 10.0)
else:
initialBalance = 0.0 # PREPAY_MANDATE / CREDIT_POSTPAY: budget on pool
userMandates = appInterface.getUserMandatesByMandate(mandateId)
accountsCreated = 0
for um in userMandates:
userId = um.get("userId") if isinstance(um, dict) else getattr(um, "userId", None)
if userId:
existingAccount = billingInterface.getUserAccount(mandateId, userId)
if not existingAccount:
billingInterface.getOrCreateUserAccount(mandateId, userId, initialBalance=initialBalance)
accountsCreated += 1
logger.debug(f"Created billing account for user {userId}")
if accountsCreated > 0:
logger.info(f"Created {accountsCreated} billing accounts for root mandate users with {initialBalance} CHF each")
except Exception as e:
logger.warning(f"Failed to initialize root mandate billing (non-critical): {e}")
def assignInitialUserMemberships(
db: DatabaseConnector,
mandateId: str,
adminUserId: str,
eventUserId: str
) -> None:
"""
Assign initial memberships to admin and event users via UserMandate + UserMandateRole.
This is the NEW multi-tenant way of assigning roles.
NOTE: SysAdmin is a flag (User.isSysAdmin), not a role. Initial users get the "admin" role
within the root mandate, plus they have isSysAdmin=True for system-level access.
Args:
db: Database connector instance
mandateId: Root mandate ID
adminUserId: Admin user ID
eventUserId: Event user ID
"""
# Use mandate-instance "admin" role (not the global template)
mandateAdminRoles = db.getRecordset(
Role,
recordFilter={"roleLabel": "admin", "mandateId": mandateId, "featureInstanceId": None}
)
adminRoleId = mandateAdminRoles[0].get("id") if mandateAdminRoles else None
if not adminRoleId:
logger.warning(f"Admin role not found for mandate {mandateId}, skipping membership assignment")
return
for userId, userName in [(adminUserId, "admin"), (eventUserId, "event")]:
# Check if UserMandate already exists
existingMemberships = db.getRecordset(
UserMandate,
recordFilter={"userId": userId, "mandateId": mandateId}
)
if existingMemberships:
userMandateId = existingMemberships[0].get("id")
else:
# Create UserMandate
userMandate = UserMandate(
userId=userId,
mandateId=mandateId,
enabled=True
)
createdMembership = db.recordCreate(UserMandate, userMandate)
userMandateId = createdMembership.get("id")
logger.info(f"Created UserMandate for {userName} user with ID {userMandateId}")
# Check if UserMandateRole already exists
existingRoles = db.getRecordset(
UserMandateRole,
recordFilter={"userMandateId": userMandateId, "roleId": adminRoleId}
)
if not existingRoles:
# Create UserMandateRole with "admin" role
userMandateRole = UserMandateRole(
userMandateId=userMandateId,
roleId=adminRoleId
)
db.recordCreate(UserMandateRole, userMandateRole)
logger.info(f"Assigned admin role to {userName} user in mandate")
def _getPasswordHash(password: Optional[str]) -> Optional[str]:
"""
Hash a password using Argon2.
Args:
password: Plain text password
Returns:
Hashed password or None if password is None
"""
if password is None:
return None
return pwdContext.hash(password)
def _applyDatabaseOptimizations(db: DatabaseConnector) -> None:
"""
Apply multi-tenant database optimizations after bootstrap.
Creates indexes, immutable triggers, and foreign key constraints
for the multi-tenant junction tables. All operations are idempotent.
Args:
db: Database connector instance
"""
try:
from modules.shared.dbMultiTenantOptimizations import applyMultiTenantOptimizations
result = applyMultiTenantOptimizations(db)
if result.get("errors"):
for error in result["errors"]:
logger.warning(f"DB optimization error: {error}")
else:
totalCreated = (
result.get("indexesCreated", 0) +
result.get("triggersCreated", 0) +
result.get("foreignKeysCreated", 0)
)
if totalCreated > 0:
logger.info(
f"Applied DB optimizations: {result['indexesCreated']} indexes, "
f"{result['triggersCreated']} triggers, "
f"{result['foreignKeysCreated']} foreign keys"
)
# If nothing created, optimizations were already applied (idempotent)
except ImportError as e:
logger.warning(f"DB optimizations module not available: {e}")
except Exception as e:
# Don't fail bootstrap if optimizations fail
logger.warning(f"Failed to apply DB optimizations (non-critical): {e}")