gateway/modules/security/rootAccess.py
2026-01-17 02:17:58 +01:00

94 lines
2.9 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Root access management for system-level operations.
Provides secure access to root user and DbApp database connector.
Bei leerer Datenbank wird automatisch Bootstrap ausgeführt.
"""
import logging
from modules.connectors.connectorDbPostgre import DatabaseConnector
from modules.datamodels.datamodelUam import User, UserInDB
from modules.shared.configuration import APP_CONFIG
logger = logging.getLogger(__name__)
_rootDbAppConnector = None
_rootUser = None
_bootstrapExecuted = False
def getRootDbAppConnector() -> DatabaseConnector:
"""
Returns a DatabaseConnector instance for the DbApp database.
This is used for accessing system tables like AccessRule.
"""
global _rootDbAppConnector
if _rootDbAppConnector is None:
_rootDbAppConnector = DatabaseConnector(
dbHost=APP_CONFIG.get("DB_HOST"),
dbDatabase="poweron_app",
dbUser=APP_CONFIG.get("DB_USER"),
dbPassword=APP_CONFIG.get("DB_PASSWORD_SECRET"),
dbPort=int(APP_CONFIG.get("DB_PORT", 5432)),
userId=None # No user context for root connector
)
_rootDbAppConnector.initDbSystem()
return _rootDbAppConnector
def _ensureBootstrap():
"""
Führt Bootstrap aus, falls noch nicht geschehen.
Wird automatisch aufgerufen, wenn getRootUser() keinen User findet.
"""
global _bootstrapExecuted
if _bootstrapExecuted:
return
logger.info("Running bootstrap to initialize database")
# Import here to avoid circular imports
from modules.interfaces.interfaceBootstrap import initBootstrap
dbApp = getRootDbAppConnector()
initBootstrap(dbApp)
_bootstrapExecuted = True
logger.info("Bootstrap completed")
def getRootUser() -> User:
"""
Returns the root user (initial user from database).
Used for system-level operations that require root privileges.
Falls kein User existiert, wird Bootstrap automatisch ausgeführt.
"""
global _rootUser
if _rootUser is None:
dbApp = getRootDbAppConnector()
initialUserId = dbApp.getInitialId(UserInDB)
# Wenn kein User existiert, Bootstrap ausführen
if not initialUserId:
logger.info("No initial user found, running bootstrap")
_ensureBootstrap()
# Nochmal versuchen nach Bootstrap
initialUserId = dbApp.getInitialId(UserInDB)
if not initialUserId:
raise ValueError("No initial user ID found in database after bootstrap")
users = dbApp.getRecordset(UserInDB, recordFilter={"id": initialUserId})
if not users:
raise ValueError("Initial user not found in database")
user_data = users[0]
_rootUser = User(**user_data)
return _rootUser