""" Interface to the Gateway system. Manages users and mandates for authentication. """ import os import logging from typing import Dict, Any, List, Optional, Union import importlib from passlib.context import CryptContext from connectors.connectorDbJson import DatabaseConnector from modules.configuration import APP_CONFIG logger = logging.getLogger(__name__) # Password-Hashing pwdContext = CryptContext(schemes=["argon2"], deprecated="auto") class GatewayInterface: """ Interface to the Gateway system. Manages users and mandates. """ def __init__(self, mandateId: int = None, userId: int = None): """ Initializes the Gateway Interface with optional mandate and user context. Args: mandateId: ID of the current mandate (optional) userId: ID of the current user (optional) """ # Context can be empty during initialization self.mandateId = mandateId self.userId = userId # Import data model module try: self.modelModule = importlib.import_module("modules.gatewayModel") logger.info("gatewayModel successfully imported") except ImportError as e: logger.error(f"Error importing gatewayModel: {e}") raise # Initialize database self._initializeDatabase() def _initializeDatabase(self): """ Initializes the database with minimal objects """ self.db = DatabaseConnector( dbHost=APP_CONFIG.get("DB_SYSTEM_HOST"), dbDatabase=APP_CONFIG.get("DB_SYSTEM_DATABASE"), dbUser=APP_CONFIG.get("DB_SYSTEM_USER"), dbPassword=APP_CONFIG.get("DB_SYSTEM_PASSWORD_SECRET"), mandateId=self.mandateId if self.mandateId else 0, userId=self.userId if self.userId else 0 ) # Create Root mandate if needed existingMandateId = self.getInitialId("mandates") mandates = self.db.getRecordset("mandates") if existingMandateId is None or not mandates: logger.info("Creating Root mandate") rootMandate = { "name": "Root", "language": "de" } createdMandate = self.db.recordCreate("mandates", rootMandate) logger.info(f"Root mandate created with ID {createdMandate['id']}") # Update mandate context self.mandateId = createdMandate['id'] self.userId = createdMandate['userId'] # Recreate connector with correct context self.db = DatabaseConnector( dbHost=APP_CONFIG.get("DB_SYSTEM_HOST"), dbDatabase=APP_CONFIG.get("DB_SYSTEM_DATABASE"), dbUser=APP_CONFIG.get("DB_SYSTEM_USER"), dbPassword=APP_CONFIG.get("DB_SYSTEM_PASSWORD_SECRET"), mandateId=self.mandateId, userId=self.userId ) # Create Admin user if needed existingUserId = self.getInitialId("users") users = self.db.getRecordset("users") if existingUserId is None or not users: logger.info("Creating Admin user") adminUser = { "mandateId": self.mandateId, "username": "admin", "email": "admin@example.com", "fullName": "Administrator", "disabled": False, "language": "de", "privilege": "sysadmin", # SysAdmin privilege "hashedPassword": self._getPasswordHash("admin") # Use a secure password in production! } createdUser = self.db.recordCreate("users", adminUser) logger.info(f"Admin user created with ID {createdUser['id']}") # Update user context self.userId = createdUser['id'] # Recreate connector with correct context self.db = DatabaseConnector( dbHost=APP_CONFIG.get("DB_SYSTEM_HOST"), dbDatabase=APP_CONFIG.get("DB_SYSTEM_DATABASE"), dbUser=APP_CONFIG.get("DB_SYSTEM_USER"), dbPassword=APP_CONFIG.get("DB_SYSTEM_PASSWORD_SECRET"), mandateId=self.mandateId, userId=self.userId ) def getInitialId(self, table: str) -> Optional[int]: """Returns the initial ID for a table""" return self.db.getInitialId(table) def _getPasswordHash(self, password: str) -> str: """Creates a hash for a password""" return pwdContext.hash(password) def _verifyPassword(self, plainPassword: str, hashedPassword: str) -> bool: """Checks if the password matches the hash""" return pwdContext.verify(plainPassword, hashedPassword) def _getCurrentTimestamp(self) -> str: """Returns the current timestamp in ISO format""" from datetime import datetime return datetime.now().isoformat() # Mandate methods def getAllMandates(self) -> List[Dict[str, Any]]: """Returns all mandates""" return self.db.getRecordset("mandates") def getMandate(self, mandateId: int) -> Optional[Dict[str, Any]]: """Returns a mandate by its ID""" mandates = self.db.getRecordset("mandates", recordFilter={"id": mandateId}) if mandates: return mandates[0] return None def createMandate(self, name: str, language: str = "de") -> Dict[str, Any]: """Creates a new mandate""" mandateData = { "name": name, "language": language } return self.db.recordCreate("mandates", mandateData) def updateMandate(self, mandateId: int, mandateData: Dict[str, Any]) -> Dict[str, Any]: """ Updates an existing mandate Args: mandateId: The ID of the mandate to update mandateData: The mandate data to update Returns: Dict[str, Any]: The updated mandate data Raises: ValueError: If the mandate is not found """ # Check if the mandate exists mandate = self.getMandate(mandateId) if not mandate: raise ValueError(f"Mandate with ID {mandateId} not found") # Update the mandate updatedMandate = self.db.recordModify("mandates", mandateId, mandateData) return updatedMandate def deleteMandate(self, mandateId: int) -> bool: """ Deletes a mandate and all associated users and data Args: mandateId: The ID of the mandate to delete Returns: bool: True if the mandate was successfully deleted, otherwise False """ # Check if the mandate exists mandate = self.getMandate(mandateId) if not mandate: return False # Check if it's the initial mandate initialMandateId = self.getInitialId("mandates") if initialMandateId is not None and mandateId == initialMandateId: logger.warning(f"Attempt to delete the Root mandate was prevented") return False # Find all users of the mandate users = self.getUsersByMandate(mandateId) # Delete all users of the mandate and their associated data for user in users: self.deleteUser(user["id"]) # Delete the mandate success = self.db.recordDelete("mandates", mandateId) if success: logger.info(f"Mandate with ID {mandateId} was successfully deleted") else: logger.error(f"Error deleting mandate with ID {mandateId}") return success # User methods def getAllUsers(self) -> List[Dict[str, Any]]: """Returns all users""" users = self.db.getRecordset("users") # Remove password hashes from the response for user in users: if "hashedPassword" in user: del user["hashedPassword"] return users def getUsersByMandate(self, mandateId: int) -> List[Dict[str, Any]]: """ Returns all users of a specific mandate Args: mandateId: The ID of the mandate Returns: List[Dict[str, Any]]: List of users in the mandate """ users = self.db.getRecordset("users", recordFilter={"mandateId": mandateId}) # Remove password hashes from the response for user in users: if "hashedPassword" in user: del user["hashedPassword"] return users def getUserByUsername(self, username: str) -> Optional[Dict[str, Any]]: """Returns a user by username""" users = self.db.getRecordset("users") for user in users: if user.get("username") == username: return user return None def getUser(self, userId: int) -> Optional[Dict[str, Any]]: """Returns a user by ID""" users = self.db.getRecordset("users", recordFilter={"id": userId}) if users: user = users[0] # Remove password hash from the API response if "hashedPassword" in user: userCopy = user.copy() del userCopy["hashedPassword"] return userCopy return user return None def createUser(self, username: str, password: str, email: str = None, fullName: str = None, language: str = "de", mandateId: int = None, disabled: bool = False, privilege: str = "user") -> Dict[str, Any]: """ Creates a new user Args: username: The username password: The password email: The email address (optional) fullName: The full name (optional) language: The preferred language (default: "de") mandateId: The ID of the mandate (optional) disabled: Whether the user is disabled (default: False) privilege: The privilege level (default: "user") Returns: Dict[str, Any]: The created user data Raises: ValueError: If the username already exists """ # Check if the username already exists existingUser = self.getUserByUsername(username) if existingUser: raise ValueError(f"User '{username}' already exists") # Use the provided mandateId or the current context userMandateId = mandateId if mandateId is not None else self.mandateId userData = { "mandateId": userMandateId, "username": username, "email": email, "fullName": fullName, "disabled": disabled, "language": language, "privilege": privilege, "hashedPassword": self._getPasswordHash(password) } createdUser = self.db.recordCreate("users", userData) # Remove password hash from the response if "hashedPassword" in createdUser: del createdUser["hashedPassword"] return createdUser def authenticateUser(self, username: str, password: str) -> Optional[Dict[str, Any]]: """ Authenticates a user by username and password Args: username: The username password: The password Returns: Optional[Dict[str, Any]]: The user data or None if authentication fails """ user = self.getUserByUsername(username) if not user: return None if not self._verifyPassword(password, user.get("hashedPassword", "")): return None # Check if the user is disabled if user.get("disabled", False): return None # Create a copy without password hash authenticatedUser = {**user} if "hashedPassword" in authenticatedUser: del authenticatedUser["hashedPassword"] return authenticatedUser def updateUser(self, userId: int, userData: Dict[str, Any]) -> Dict[str, Any]: """ Updates a user Args: userId: The ID of the user to update userData: The user data to update Returns: Dict[str, Any]: The updated user data Raises: ValueError: If the user is not found """ # Get the current user with password hash (directly from DB) users = self.db.getRecordset("users", recordFilter={"id": userId}) if not users: raise ValueError(f"User with ID {userId} not found") user = users[0] # If the password is being changed, hash it if "password" in userData: userData["hashedPassword"] = self._getPasswordHash(userData["password"]) del userData["password"] # Update the user updatedUser = self.db.recordModify("users", userId, userData) # Remove password hash from the response if "hashedPassword" in updatedUser: del updatedUser["hashedPassword"] return updatedUser def disableUser(self, userId: int) -> Dict[str, Any]: """Disables a user""" return self.updateUser(userId, {"disabled": True}) def enableUser(self, userId: int) -> Dict[str, Any]: """Enables a user""" return self.updateUser(userId, {"disabled": False}) def _deleteUserReferencedData(self, userId: int) -> None: """ Deletes all data associated with a user Args: userId: The ID of the user """ # Here all tables are searched and all entries referencing this user are deleted # Delete user attributes try: attributes = self.db.getRecordset("attributes", recordFilter={"userId": userId}) for attribute in attributes: self.db.recordDelete("attributes", attribute["id"]) except Exception as e: logger.error(f"Error deleting attributes for user {userId}: {e}") # Other tables that might reference the user # (Depending on the application's database structure) logger.info(f"All referenced data for user {userId} has been deleted") def deleteUser(self, userId: int) -> bool: """ Deletes a user and all associated data Args: userId: The ID of the user to delete Returns: bool: True if the user was successfully deleted, otherwise False """ # Check if the user exists users = self.db.getRecordset("users", recordFilter={"id": userId}) if not users: return False # Check if it's the initial user initialUserId = self.getInitialId("users") if initialUserId is not None and userId == initialUserId: logger.warning("Attempt to delete the Root Admin was prevented") return False # Delete all data associated with the user self._deleteUserReferencedData(userId) # Delete the user success = self.db.recordDelete("users", userId) if success: logger.info(f"User with ID {userId} was successfully deleted") else: logger.error(f"Error deleting user with ID {userId}") return success # Singleton factory for GatewayInterface instances per context _gatewayInterfaces = {} def getGatewayInterface(mandateId: int = None, userId: int = None) -> GatewayInterface: """ Returns a GatewayInterface instance for the specified context. Reuses existing instances. Args: mandateId: ID of the mandate userId: ID of the user Returns: GatewayInterface instance """ contextKey = f"{mandateId}_{userId}" if contextKey not in _gatewayInterfaces: _gatewayInterfaces[contextKey] = GatewayInterface(mandateId, userId) return _gatewayInterfaces[contextKey] # Initialize the interface getGatewayInterface()