""" Interface to the Gateway system. Manages users and mandates for authentication. """ from datetime import datetime, timedelta, UTC import os import logging from typing import Dict, Any, List, Optional, Union import importlib import json from passlib.context import CryptContext import uuid from modules.connectors.connectorDbJson import DatabaseConnector from modules.shared.configuration import APP_CONFIG from modules.interfaces.interfaceAppAccess import AppAccess from modules.interfaces.interfaceAppModel import ( User, Mandate, UserInDB, UserConnection, Session, AuthEvent, AuthAuthority, UserPrivilege, ConnectionStatus, Token, LocalToken, GoogleToken, MsftToken ) from modules.shared.attributeUtils import ModelMixin logger = logging.getLogger(__name__) # Singleton factory for AppObjects instances per context _gatewayInterfaces = {} # Root interface instance _rootAppObjects = None # Password-Hashing pwdContext = CryptContext(schemes=["argon2"], deprecated="auto") class AppObjects: """ Interface to the Gateway system. Manages users and mandates. """ def __init__(self, currentUser: Optional[User] = None): """Initializes the Gateway Interface.""" # Initialize variables self.currentUser = currentUser # Store User object directly self.userId = currentUser.id if currentUser else None self.mandateId = currentUser.mandateId if currentUser else None self.access = None # Will be set when user context is provided # Initialize database self._initializeDatabase() # Initialize standard records if needed self._initRecords() # Set user context if provided if currentUser: self.setUserContext(currentUser) def setUserContext(self, currentUser: User): """Sets the user context for the interface.""" if not currentUser: logger.info("Initializing interface without user context") return self.currentUser = currentUser # Store User object directly self.userId = currentUser.id self.mandateId = currentUser.mandateId if not self.userId or not self.mandateId: raise ValueError("Invalid user context: id and mandateId are required") # Add language settings self.userLanguage = currentUser.language # Default user language # Initialize access control with user context self.access = AppAccess(self.currentUser, self.db) # Convert to dict only when needed # Update database context self.db.updateContext(self.userId) logger.debug(f"User context set: userId={self.userId}, mandateId={self.mandateId}") def _initializeDatabase(self): """Initializes the database connection.""" try: # Get configuration values with defaults dbHost = APP_CONFIG.get("DB_APP_HOST", "_no_config_default_data") dbDatabase = APP_CONFIG.get("DB_APP_DATABASE", "app") dbUser = APP_CONFIG.get("DB_APP_USER") dbPassword = APP_CONFIG.get("DB_APP_PASSWORD_SECRET") # Ensure the database directory exists os.makedirs(dbHost, exist_ok=True) self.db = DatabaseConnector( dbHost=dbHost, dbDatabase=dbDatabase, dbUser=dbUser, dbPassword=dbPassword ) logger.info("Database initialized successfully") except Exception as e: logger.error(f"Failed to initialize database: {str(e)}") raise def _initRecords(self): """Initialize standard records if they don't exist.""" self._initRootMandate() self._initAdminUser() def _initRootMandate(self): """Creates the Root mandate if it doesn't exist.""" existingMandateId = self.getInitialId("mandates") mandates = self.db.getRecordset("mandates") if existingMandateId is None or not mandates: logger.info("Creating Root mandate") rootMandate = Mandate( name="Root", language="en", enabled=True ) createdMandate = self.db.recordCreate("mandates", rootMandate.to_dict()) logger.info(f"Root mandate created with ID {createdMandate['id']}") # Register the initial ID self.db._registerInitialId("mandates", createdMandate['id']) # Update mandate context self.mandateId = createdMandate['id'] def _initAdminUser(self): """Creates the Admin user if it doesn't exist.""" existingUserId = self.getInitialId("users") users = self.db.getRecordset("users") if existingUserId is None or not users: logger.info("Creating Admin user") adminUser = UserInDB( mandateId=self.getInitialId("mandates"), username="admin", email="admin@example.com", fullName="Administrator", enabled=True, language="en", privilege=UserPrivilege.SYSADMIN, authenticationAuthority="local", # Using lowercase value directly hashedPassword=self._getPasswordHash("The 1st Poweron Admin"), # Use a secure password in production! connections=[] ) createdUser = self.db.recordCreate("users", adminUser.to_dict()) logger.info(f"Admin user created with ID {createdUser['id']}") # Register the initial ID self.db._registerInitialId("users", createdUser['id']) # Update user context self.currentUser = createdUser self.userId = createdUser.get("id") def _uam(self, table: str, recordset: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """ Unified user access management function that filters data based on user privileges and adds access control attributes. Args: table: Name of the table recordset: Recordset to filter based on access rules Returns: Filtered recordset with access control attributes """ # First apply access control filteredRecords = self.access.uam(table, recordset) # Then filter out database-specific fields cleanedRecords = [] for record in filteredRecords: # Create a new dict with only non-database fields cleanedRecord = {k: v for k, v in record.items() if not k.startswith('_')} cleanedRecords.append(cleanedRecord) return cleanedRecords def _canModify(self, table: str, recordId: Optional[str] = None) -> bool: """ Checks if the current user can modify (create/update/delete) records in a table. Args: table: Name of the table recordId: Optional record ID for specific record check Returns: Boolean indicating permission """ return self.access.canModify(table, recordId) def _clearTableCache(self, table: str) -> None: """Clears the cache for a specific table to ensure fresh data.""" self.db.clearTableCache(table) def getInitialId(self, table: str) -> Optional[str]: """Returns the initial ID for a table.""" return self.db.getInitialId(table) def _getPasswordHash(self, password: str) -> str: """Creates a hash for a password.""" return pwdContext.hash(password) def _verifyPassword(self, plainPassword: str, hashedPassword: str) -> bool: """Checks if the password matches the hash.""" return pwdContext.verify(plainPassword, hashedPassword) # User methods def getAllUsers(self) -> List[User]: """Returns users based on user access level.""" allUsers = self.db.getRecordset("users") filteredUsers = self._uam("users", allUsers) # Convert to User models return [User.from_dict(user) for user in filteredUsers] def getUsersByMandate(self, mandateId: str) -> List[User]: """Returns users for a specific mandate if user has access.""" # Get users for this mandate users = self.db.getRecordset("users", recordFilter={"mandateId": mandateId}) filteredUsers = self._uam("users", users) # Convert to User models return [User.from_dict(user) for user in filteredUsers] def getUserByUsername(self, username: str) -> Optional[User]: """Returns a user by username.""" try: # Get users table users = self.db.getRecordset("users") if not users: return None # Find user by username for user_dict in users: if user_dict.get("username") == username: return User.from_dict(user_dict) logger.info(f"No user found with username {username}") return None except Exception as e: logger.error(f"Error getting user by username: {str(e)}") return None def getUser(self, userId: str) -> Optional[User]: """Returns a user by ID if user has access.""" try: # Get all users users = self.db.getRecordset("users") if not users: return None # Find user by ID for user_dict in users: if user_dict.get("id") == userId: # Apply access control filteredUsers = self._uam("users", [user_dict]) if filteredUsers: return User.from_dict(filteredUsers[0]) return None return None except Exception as e: logger.error(f"Error getting user by ID: {str(e)}") return None def getUserConnections(self, userId: str) -> List[UserConnection]: """Returns all connections for a user.""" try: # Get connections for this user connections = self.db.getRecordset("connections", recordFilter={"userId": userId}) # Convert to UserConnection objects result = [] for conn_dict in connections: try: # Convert string dates to datetime objects for field in ['connectedAt', 'lastChecked', 'expiresAt']: if field in conn_dict and conn_dict[field]: try: if isinstance(conn_dict[field], str): conn_dict[field] = datetime.fromisoformat(conn_dict[field].replace('Z', '+00:00')) except (ValueError, TypeError): conn_dict[field] = None # Create UserConnection object connection = UserConnection( id=conn_dict["id"], userId=conn_dict["userId"], authority=conn_dict.get("authority"), externalId=conn_dict.get("externalId", ""), externalUsername=conn_dict.get("externalUsername", ""), externalEmail=conn_dict.get("externalEmail"), status=conn_dict.get("status", "pending"), connectedAt=conn_dict.get("connectedAt"), lastChecked=conn_dict.get("lastChecked"), expiresAt=conn_dict.get("expiresAt") ) result.append(connection) except Exception as e: logger.error(f"Error converting connection dict to object: {str(e)}") continue return result except Exception as e: logger.error(f"Error getting user connections: {str(e)}") return [] def addUserConnection(self, userId: str, authority: AuthAuthority, externalId: str, externalUsername: str, externalEmail: Optional[str] = None, status: ConnectionStatus = ConnectionStatus.PENDING) -> UserConnection: """ Adds a new connection for a user. Args: userId: The ID of the user authority: The authentication authority (e.g., MSFT, GOOGLE) externalId: The external ID from the authority externalUsername: The username from the authority externalEmail: Optional email from the authority status: The connection status (defaults to PENDING) Returns: The created UserConnection object """ try: # Get the user user = self.getUser(userId) if not user: raise ValueError(f"User not found: {userId}") # Create new connection with all required fields connection = UserConnection( id=str(uuid.uuid4()), userId=userId, authority=authority, externalId=externalId, externalUsername=externalUsername, externalEmail=externalEmail, status=status, connectedAt=datetime.now(UTC), lastChecked=datetime.now(UTC), expiresAt=None # Optional field, set to None by default ) # Save to connections table self.db.recordCreate("connections", connection.to_dict()) # Clear cache to ensure fresh data self._clearTableCache("connections") return connection except Exception as e: logger.error(f"Error adding user connection: {str(e)}") raise ValueError(f"Failed to add user connection: {str(e)}") def removeUserConnection(self, connectionId: str) -> None: """Remove a connection to an external service""" try: # Get connection connections = self.db.getRecordset("connections", recordFilter={ "id": connectionId }) if not connections: raise ValueError(f"Connection {connectionId} not found") # Delete connection self.db.recordDelete("connections", connectionId) # Clear cache to ensure fresh data self._clearTableCache("connections") except Exception as e: logger.error(f"Error removing user connection: {str(e)}") raise ValueError(f"Failed to remove user connection: {str(e)}") def authenticateLocalUser(self, username: str, password: str) -> Optional[User]: """Authenticates a user by username and password using local authentication.""" # Clear the users table from cache and reload it self._clearTableCache("users") # Get user by username user = self.getUserByUsername(username) if not user: raise ValueError("User not found") # Check if the user is enabled if not user.enabled: raise ValueError("User is disabled") # Verify that the user has local authentication enabled if user.authenticationAuthority != AuthAuthority.LOCAL: raise ValueError("User does not have local authentication enabled") # Get the full user record with password hash for verification userRecord = self.db.getRecordset("users", recordFilter={"id": user.id})[0] if not userRecord.get("hashedPassword"): raise ValueError("User has no password set") if not self._verifyPassword(password, userRecord["hashedPassword"]): raise ValueError("Invalid password") return user def createUser(self, username: str, password: str = None, email: str = None, fullName: str = None, language: str = "en", enabled: bool = True, privilege: UserPrivilege = UserPrivilege.USER, authenticationAuthority: AuthAuthority = AuthAuthority.LOCAL, externalId: str = None, externalUsername: str = None, externalEmail: str = None) -> User: """Create a new user with optional external connection""" try: # Ensure username is a string username = str(username).strip() # Validate password for local authentication if authenticationAuthority == AuthAuthority.LOCAL: if not password: raise ValueError("Password is required for local authentication") if not isinstance(password, str): raise ValueError("Password must be a string") if not password.strip(): raise ValueError("Password cannot be empty") # Create user data using UserInDB model userData = UserInDB( username=username, email=email, fullName=fullName, language=language, mandateId=self.mandateId, enabled=enabled, privilege=privilege, authenticationAuthority=authenticationAuthority, hashedPassword=self._getPasswordHash(password) if password else None, connections=[] ) # Create user record createdRecord = self.db.recordCreate("users", userData.to_dict()) if not createdRecord or not createdRecord.get("id"): raise ValueError("Failed to create user record") # Clear cache to ensure fresh data self._clearTableCache("users") # Add external connection if provided if externalId and externalUsername: self.addUserConnection( createdRecord["id"], authenticationAuthority, externalId, externalUsername, externalEmail ) # Get created user using the returned ID createdUser = self.db.getRecordset("users", recordFilter={"id": createdRecord["id"]}) if not createdUser or len(createdUser) == 0: raise ValueError("Failed to retrieve created user") # Clear cache to ensure fresh data (already done above) # No need for additional cache clearing since _clearTableCache("users") was called return User.from_dict(createdUser[0]) except ValueError as e: logger.error(f"Error creating user: {str(e)}") raise except Exception as e: logger.error(f"Unexpected error creating user: {str(e)}") raise ValueError(f"Failed to create user: {str(e)}") def updateUser(self, userId: str, updateData: Dict[str, Any]) -> User: """Update a user's information""" try: # Get user user = self.getUser(userId) if not user: raise ValueError(f"User {userId} not found") # Update user data using model updatedData = user.to_dict() updatedData.update(updateData) updatedUser = User.from_dict(updatedData) # Update user record self.db.recordModify("users", userId, updatedUser.to_dict()) # Clear cache to ensure fresh data self._clearTableCache("users") # Get updated user updatedUser = self.getUser(userId) if not updatedUser: raise ValueError("Failed to retrieve updated user") return updatedUser except Exception as e: logger.error(f"Error updating user: {str(e)}") raise ValueError(f"Failed to update user: {str(e)}") def disableUser(self, userId: str) -> User: """Disables a user if current user has permission.""" return self.updateUser(userId, {"enabled": False}) def enableUser(self, userId: str) -> User: """Enables a user if current user has permission.""" return self.updateUser(userId, {"enabled": True}) def _deleteUserReferencedData(self, userId: str) -> None: """Deletes all data associated with a user.""" try: # Delete user sessions sessions = self.db.getRecordset("sessions", recordFilter={"userId": userId}) for session in sessions: self.db.recordDelete("sessions", session["id"]) logger.debug(f"Deleted session {session['id']} for user {userId}") # Delete user auth events events = self.db.getRecordset("auth_events", recordFilter={"userId": userId}) for event in events: self.db.recordDelete("auth_events", event["id"]) logger.debug(f"Deleted auth event {event['id']} for user {userId}") # Delete user tokens tokens = self.db.getRecordset("tokens", recordFilter={"userId": userId}) for token in tokens: self.db.recordDelete("tokens", token["id"]) # Delete user connections connections = self.db.getRecordset("connections", recordFilter={"userId": userId}) for conn in connections: self.db.recordDelete("connections", conn["id"]) logger.debug(f"Deleted connection {conn['id']} for user {userId}") logger.info(f"All referenced data for user {userId} has been deleted") except Exception as e: logger.error(f"Error deleting referenced data for user {userId}: {str(e)}") raise def deleteUser(self, userId: str) -> bool: """Deletes a user if current user has permission.""" try: # Get user user = self.getUser(userId) if not user: raise ValueError(f"User {userId} not found") if not self._canModify("users", userId): raise PermissionError(f"No permission to delete user {userId}") # Delete all referenced data first self._deleteUserReferencedData(userId) # Delete user record success = self.db.recordDelete("users", userId) if not success: raise ValueError(f"Failed to delete user {userId}") # Clear cache to ensure fresh data self._clearTableCache("users") logger.info(f"User {userId} successfully deleted") return True except Exception as e: logger.error(f"Error deleting user: {str(e)}") raise ValueError(f"Failed to delete user: {str(e)}") # Mandate methods def getAllMandates(self) -> List[Mandate]: """Returns all mandates based on user access level.""" allMandates = self.db.getRecordset("mandates") filteredMandates = self._uam("mandates", allMandates) return [Mandate.from_dict(mandate) for mandate in filteredMandates] def getMandate(self, mandateId: str) -> Optional[Mandate]: """Returns a mandate by ID if user has access.""" mandates = self.db.getRecordset("mandates", recordFilter={"id": mandateId}) if not mandates: return None filteredMandates = self._uam("mandates", mandates) if not filteredMandates: return None return Mandate.from_dict(filteredMandates[0]) def createMandate(self, name: str, language: str = "en") -> Mandate: """Creates a new mandate if user has permission.""" if not self._canModify("mandates"): raise PermissionError("No permission to create mandates") # Create mandate data using model mandateData = Mandate( name=name, language=language ) # Create mandate record createdRecord = self.db.recordCreate("mandates", mandateData.to_dict()) if not createdRecord or not createdRecord.get("id"): raise ValueError("Failed to create mandate record") # Clear cache to ensure fresh data self._clearTableCache("mandates") return Mandate.from_dict(createdRecord) def updateMandate(self, mandateId: str, updateData: Dict[str, Any]) -> Mandate: """Updates a mandate if user has access.""" try: logger.debug(f"Updating mandate {mandateId} with data: {updateData}") # First check if user has permission to modify mandates if not self._canModify("mandates", mandateId): raise PermissionError(f"No permission to update mandate {mandateId}") # Get mandate with access control mandate = self.getMandate(mandateId) logger.debug(f"Retrieved mandate: {mandate}") if not mandate: raise ValueError(f"Mandate {mandateId} not found") # Update mandate data using model updatedData = mandate.to_dict() updatedData.update(updateData) logger.debug(f"Updated data: {updatedData}") updatedMandate = Mandate.from_dict(updatedData) # Update mandate record self.db.recordModify("mandates", mandateId, updatedMandate.to_dict()) # Clear cache to ensure fresh data self._clearTableCache("mandates") # Get updated mandate updatedMandate = self.getMandate(mandateId) if not updatedMandate: raise ValueError("Failed to retrieve updated mandate") return updatedMandate except Exception as e: logger.error(f"Error updating mandate: {str(e)}") raise ValueError(f"Failed to update mandate: {str(e)}") def deleteMandate(self, mandateId: str) -> bool: """Deletes a mandate if user has access.""" try: # Check if mandate exists and user has access mandate = self.getMandate(mandateId) if not mandate: return False if not self._canModify("mandates", mandateId): raise PermissionError(f"No permission to delete mandate {mandateId}") # Check if mandate has users users = self.getUsersByMandate(mandateId) if users: raise ValueError(f"Cannot delete mandate {mandateId} with existing users") # Delete mandate success = self.db.recordDelete("mandates", mandateId) # Clear cache to ensure fresh data self._clearTableCache("mandates") return success except Exception as e: logger.error(f"Error deleting mandate: {str(e)}") raise ValueError(f"Failed to delete mandate: {str(e)}") def _getInitialUser(self) -> Optional[Dict[str, Any]]: """Get the initial user record directly from database without access control.""" try: initialUserId = self.db.getInitialId("users") if not initialUserId: return None users = self.db.getRecordset("users", recordFilter={"id": initialUserId}) return users[0] if users else None except Exception as e: logger.error(f"Error getting initial user: {str(e)}") return None def checkUsernameAvailability(self, checkData: Dict[str, Any]) -> Dict[str, Any]: """Checks if a username is available for registration.""" try: username = checkData.get("username") authenticationAuthority = checkData.get("authenticationAuthority", "local") if not username: return { "available": False, "message": "Username is required" } # Get user by username user = self.getUserByUsername(username) # Check if user exists (User model instance) if user is not None: return { "available": False, "message": "Username is already taken" } return { "available": True, "message": "Username is available" } except Exception as e: logger.error(f"Error checking username availability: {str(e)}") return { "available": False, "message": f"Error checking username availability: {str(e)}" } def saveToken(self, token: Token) -> None: """Save a token for the current user""" try: # Validate user context if not self.currentUser or not self.currentUser.id: raise ValueError("No valid user context available for token storage") # Set the user ID and mandate ID token.userId = self.currentUser.id # Ensure token has required fields if not token.id: token.id = str(uuid.uuid4()) if not token.createdAt: token.createdAt = datetime.now() # Convert to dict and ensure all fields are properly set token_dict = token.dict() # Ensure userId is set to current user (this might override the token's userId) token_dict["userId"] = self.currentUser.id # Convert datetime objects to ISO format strings if isinstance(token_dict.get("createdAt"), datetime): token_dict["createdAt"] = token_dict["createdAt"].isoformat() if isinstance(token_dict.get("expiresAt"), datetime): token_dict["expiresAt"] = token_dict["expiresAt"].isoformat() # Save to database self.db.recordCreate("tokens", token_dict) # Clear cache to ensure fresh data self._clearTableCache("tokens") except Exception as e: logger.error(f"Error saving token: {str(e)}") raise def getToken(self, authority: str, auto_refresh: bool = True) -> Optional[Token]: """Get the latest valid token for the current user and authority, optionally auto-refresh if expired""" try: # Get tokens for this user and authority tokens = self.db.getRecordset("tokens", recordFilter={ "userId": self.currentUser.id, "authority": authority }) if not tokens: return None # Sort by creation date and get the latest tokens.sort(key=lambda x: x.get("createdAt", ""), reverse=True) latest_token = Token(**tokens[0]) # Check if token is expired if latest_token.expiresAt and latest_token.expiresAt < datetime.now().timestamp(): if auto_refresh: # Import TokenManager here to avoid circular imports from modules.security.tokenManager import TokenManager token_manager = TokenManager() # Try to refresh the token refreshed_token = token_manager.refresh_token(latest_token) if refreshed_token: # Save the new token and delete the old one self.saveToken(refreshed_token) self.deleteToken(authority) return refreshed_token else: logger.warning(f"Failed to refresh expired token for {authority}") return None else: logger.warning(f"Token for {authority} is expired (expiresAt: {latest_token.expiresAt})") return None return latest_token except Exception as e: logger.error(f"Error getting token: {str(e)}") return None def getTokenForConnection(self, connectionId: str, auto_refresh: bool = True) -> Optional[Token]: """Get the token for a specific connection, optionally auto-refresh if expired""" try: # Get token for this specific connection tokens = self.db.getRecordset("tokens", recordFilter={ "connectionId": connectionId }) if not tokens: logger.warning(f"No token found for connection: {connectionId}") return None # Sort by creation date and get the latest tokens.sort(key=lambda x: x.get("createdAt", ""), reverse=True) latest_token = Token(**tokens[0]) # Check if token is expired if latest_token.expiresAt and latest_token.expiresAt < datetime.now().timestamp(): if auto_refresh: # Import TokenManager here to avoid circular imports from modules.security.tokenManager import TokenManager token_manager = TokenManager() # Try to refresh the token refreshed_token = token_manager.refresh_token(latest_token) if refreshed_token: # Save the new token and delete the old one self.saveToken(refreshed_token) self.deleteTokenByConnectionId(connectionId) return refreshed_token else: logger.warning(f"Failed to refresh expired token for connection {connectionId}") return None else: logger.warning(f"Token for connection {connectionId} is expired (expiresAt: {latest_token.expiresAt})") return None return latest_token except Exception as e: logger.error(f"Error getting token for connection {connectionId}: {str(e)}") return None def deleteToken(self, authority: str) -> None: """Delete all tokens for the current user and authority""" try: # Get tokens to delete tokens = self.db.getRecordset("tokens", recordFilter={ "userId": self.currentUser.id, "authority": authority }) # Delete each token for token in tokens: self.db.recordDelete("tokens", token["id"]) # Clear cache to ensure fresh data self._clearTableCache("tokens") except Exception as e: logger.error(f"Error deleting token: {str(e)}") raise def deleteTokenByConnectionId(self, connectionId: str) -> None: """Delete all tokens for a specific connection""" try: # Get tokens to delete tokens = self.db.getRecordset("tokens", recordFilter={ "connectionId": connectionId }) # Delete each token for token in tokens: self.db.recordDelete("tokens", token["id"]) # Clear cache to ensure fresh data self._clearTableCache("tokens") except Exception as e: logger.error(f"Error deleting token for connection {connectionId}: {str(e)}") raise def logout(self) -> None: """Logout current user - clear user context and tokens""" try: # Clear user context self.currentUser = None self.userId = None self.mandateId = None self.access = None # Clear database context if hasattr(self, 'db'): self.db.updateContext("") logger.info("User logged out successfully") except Exception as e: logger.error(f"Error during logout: {str(e)}") raise # Public Methods def getInterface(currentUser: User) -> AppObjects: """ Returns a AppObjects instance for the current user. Handles initialization of database and records. """ if not currentUser: raise ValueError("Invalid user context: user is required") # Create context key contextKey = f"{currentUser.mandateId}_{currentUser.id}" # Create new instance if not exists if contextKey not in _gatewayInterfaces: _gatewayInterfaces[contextKey] = AppObjects(currentUser) return _gatewayInterfaces[contextKey] def getRootUser() -> User: """ Returns the root user from the database. This is the user with the initial ID in the users table. """ try: # Create a temporary interface without user context tempInterface = AppObjects() # Get the initial user directly initialUserId = tempInterface.db.getInitialId("users") if not initialUserId: raise ValueError("No initial user ID found in database") users = tempInterface.db.getRecordset("users", recordFilter={"id": initialUserId}) if not users: raise ValueError("Initial user not found in database") # Convert to User model and return the model instance return User.from_dict(users[0]) except Exception as e: logger.error(f"Error getting root user: {str(e)}") raise ValueError(f"Failed to get root user: {str(e)}") def getRootInterface() -> AppObjects: """ Returns a AppObjects instance with root privileges. This is used for initial setup and user creation. """ global _rootAppObjects if _rootAppObjects is None: rootUser = getRootUser() _rootAppObjects = AppObjects(rootUser) return _rootAppObjects