480 lines
No EOL
18 KiB
Python
480 lines
No EOL
18 KiB
Python
"""
|
|
Interface to the Gateway system.
|
|
Manages users and mandates for authentication.
|
|
"""
|
|
|
|
import os
|
|
import logging
|
|
from typing import Dict, Any, List, Optional, Union
|
|
import importlib
|
|
from passlib.context import CryptContext
|
|
|
|
from connectors.connectorDbJson import DatabaseConnector
|
|
from modules.configuration import APP_CONFIG
|
|
from modules.gatewayAccess import _uam, _canModify
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Password-Hashing
|
|
pwdContext = CryptContext(schemes=["argon2"], deprecated="auto")
|
|
|
|
|
|
class GatewayInterface:
|
|
"""
|
|
Interface to the Gateway system.
|
|
Manages users and mandates.
|
|
"""
|
|
|
|
def __init__(self, mandateId: int = None, userId: int = None):
|
|
"""Initializes the Gateway Interface with optional mandate and user context."""
|
|
# Context can be empty during initialization
|
|
self.mandateId = mandateId
|
|
self.userId = userId
|
|
|
|
# Import data model module
|
|
try:
|
|
self.modelModule = importlib.import_module("modules.gatewayModel")
|
|
logger.info("gatewayModel successfully imported")
|
|
except ImportError as e:
|
|
logger.error(f"Error importing gatewayModel: {e}")
|
|
raise
|
|
|
|
# Initialize database
|
|
self._initializeDatabase()
|
|
|
|
# Load user information
|
|
self.currentUser = self._getCurrentUserInfo()
|
|
|
|
# Initialize standard records if needed
|
|
self._initRecords()
|
|
|
|
def _getCurrentUserInfo(self) -> Dict[str, Any]:
|
|
"""Gets information about the current user including privileges."""
|
|
# For initialization, set default values
|
|
userInfo = {
|
|
"id": self.userId,
|
|
"mandateId": self.mandateId,
|
|
"privilege": "user", # Default privilege level
|
|
"language": "en"
|
|
}
|
|
|
|
# Try to load actual user info if IDs are provided
|
|
if self.userId:
|
|
userRecords = self.db.getRecordset("users", recordFilter={"id": self.userId})
|
|
if userRecords:
|
|
user = userRecords[0]
|
|
userInfo["privilege"] = user.get("privilege", "user")
|
|
userInfo["language"] = user.get("language", "en")
|
|
|
|
return userInfo
|
|
|
|
def _initializeDatabase(self):
|
|
"""Initializes the database connection."""
|
|
self.db = DatabaseConnector(
|
|
dbHost=APP_CONFIG.get("DB_SYSTEM_HOST"),
|
|
dbDatabase=APP_CONFIG.get("DB_SYSTEM_DATABASE"),
|
|
dbUser=APP_CONFIG.get("DB_SYSTEM_USER"),
|
|
dbPassword=APP_CONFIG.get("DB_SYSTEM_PASSWORD_SECRET"),
|
|
mandateId=self.mandateId if self.mandateId else 0,
|
|
userId=self.userId if self.userId else 0
|
|
)
|
|
|
|
def _initRecords(self):
|
|
"""Initializes standard records in the database if they don't exist."""
|
|
self._initRootMandate()
|
|
self._initAdminUser()
|
|
|
|
def _initRootMandate(self):
|
|
"""Creates the Root mandate if it doesn't exist."""
|
|
existingMandateId = self.getInitialId("mandates")
|
|
mandates = self.db.getRecordset("mandates")
|
|
if existingMandateId is None or not mandates:
|
|
logger.info("Creating Root mandate")
|
|
rootMandate = {
|
|
"name": "Root",
|
|
"language": "de"
|
|
}
|
|
createdMandate = self.db.recordCreate("mandates", rootMandate)
|
|
logger.info(f"Root mandate created with ID {createdMandate['id']}")
|
|
|
|
# Update mandate context
|
|
self.mandateId = createdMandate['id']
|
|
|
|
def _initAdminUser(self):
|
|
"""Creates the Admin user if it doesn't exist."""
|
|
existingUserId = self.getInitialId("users")
|
|
users = self.db.getRecordset("users")
|
|
if existingUserId is None or not users:
|
|
logger.info("Creating Admin user")
|
|
adminUser = {
|
|
"mandateId": self.mandateId,
|
|
"username": "admin",
|
|
"email": "admin@example.com",
|
|
"fullName": "Administrator",
|
|
"disabled": False,
|
|
"language": "de",
|
|
"privilege": "sysadmin",
|
|
"hashedPassword": self._getPasswordHash("The 1st Poweron Admin") # Use a secure password in production!
|
|
}
|
|
createdUser = self.db.recordCreate("users", adminUser)
|
|
logger.info(f"Admin user created with ID {createdUser['id']}")
|
|
|
|
# Update user context
|
|
self.userId = createdUser['id']
|
|
|
|
def _uam(self, table: str, recordset: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
|
"""
|
|
Unified user access management function that filters data based on user privileges
|
|
and adds access control attributes.
|
|
|
|
Args:
|
|
table: Name of the table
|
|
recordset: Recordset to filter based on access rules
|
|
|
|
Returns:
|
|
Filtered recordset with access control attributes
|
|
"""
|
|
return _uam(self.currentUser, table, recordset, self.mandateId, self.userId, self.db)
|
|
|
|
def _canModify(self, table: str, recordId: Optional[int] = None) -> bool:
|
|
"""
|
|
Checks if the current user can modify (create/update/delete) records in a table.
|
|
|
|
Args:
|
|
table: Name of the table
|
|
recordId: Optional record ID for specific record check
|
|
|
|
Returns:
|
|
Boolean indicating permission
|
|
"""
|
|
return _canModify(self.currentUser, table, recordId, self.mandateId, self.userId, self.db)
|
|
|
|
def getInitialId(self, table: str) -> Optional[int]:
|
|
"""Returns the initial ID for a table."""
|
|
return self.db.getInitialId(table)
|
|
|
|
def _getPasswordHash(self, password: str) -> str:
|
|
"""Creates a hash for a password."""
|
|
return pwdContext.hash(password)
|
|
|
|
def _verifyPassword(self, plainPassword: str, hashedPassword: str) -> bool:
|
|
"""Checks if the password matches the hash."""
|
|
return pwdContext.verify(plainPassword, hashedPassword)
|
|
|
|
def _getCurrentTimestamp(self) -> str:
|
|
"""Returns the current timestamp in ISO format."""
|
|
from datetime import datetime
|
|
return datetime.now().isoformat()
|
|
|
|
# Mandate methods
|
|
|
|
def getAllMandates(self) -> List[Dict[str, Any]]:
|
|
"""Returns mandates based on user access level."""
|
|
allMandates = self.db.getRecordset("mandates")
|
|
return self._uam("mandates", allMandates)
|
|
|
|
def getMandate(self, mandateId: int) -> Optional[Dict[str, Any]]:
|
|
"""Returns a mandate by ID if user has access."""
|
|
mandates = self.db.getRecordset("mandates", recordFilter={"id": mandateId})
|
|
if not mandates:
|
|
return None
|
|
|
|
filteredMandates = self._uam("mandates", mandates)
|
|
return filteredMandates[0] if filteredMandates else None
|
|
|
|
def createMandate(self, name: str, language: str = "de") -> Dict[str, Any]:
|
|
"""Creates a new mandate if user has permission."""
|
|
if not self._canModify("mandates"):
|
|
raise PermissionError("No permission to create mandates")
|
|
|
|
mandateData = {
|
|
"name": name,
|
|
"language": language
|
|
}
|
|
|
|
return self.db.recordCreate("mandates", mandateData)
|
|
|
|
def updateMandate(self, mandateId: int, mandateData: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Updates a mandate if user has access."""
|
|
# Check if the mandate exists and user has access
|
|
mandate = self.getMandate(mandateId)
|
|
if not mandate:
|
|
raise ValueError(f"Mandate with ID {mandateId} not found")
|
|
|
|
if not self._canModify("mandates", mandateId):
|
|
raise PermissionError(f"No permission to update mandate {mandateId}")
|
|
|
|
# Update the mandate
|
|
return self.db.recordModify("mandates", mandateId, mandateData)
|
|
|
|
def deleteMandate(self, mandateId: int) -> bool:
|
|
"""
|
|
Deletes a mandate and all associated users and data if user has permission.
|
|
"""
|
|
# Check if the mandate exists and user has access
|
|
mandate = self.getMandate(mandateId)
|
|
if not mandate:
|
|
return False
|
|
|
|
if not self._canModify("mandates", mandateId):
|
|
raise PermissionError(f"No permission to delete mandate {mandateId}")
|
|
|
|
# Check if it's the initial mandate
|
|
initialMandateId = self.getInitialId("mandates")
|
|
if initialMandateId is not None and mandateId == initialMandateId:
|
|
logger.warning(f"Attempt to delete the Root mandate was prevented")
|
|
return False
|
|
|
|
# Find all users of the mandate
|
|
users = self.getUsersByMandate(mandateId)
|
|
|
|
# Delete all users of the mandate and their associated data
|
|
for user in users:
|
|
self.deleteUser(user["id"])
|
|
|
|
# Delete the mandate
|
|
success = self.db.recordDelete("mandates", mandateId)
|
|
|
|
if success:
|
|
logger.info(f"Mandate with ID {mandateId} was successfully deleted")
|
|
else:
|
|
logger.error(f"Error deleting mandate with ID {mandateId}")
|
|
|
|
return success
|
|
|
|
# User methods
|
|
|
|
def getAllUsers(self) -> List[Dict[str, Any]]:
|
|
"""Returns users based on user access level."""
|
|
allUsers = self.db.getRecordset("users")
|
|
filteredUsers = self._uam("users", allUsers)
|
|
|
|
# Remove password hashes
|
|
for user in filteredUsers:
|
|
if "hashedPassword" in user:
|
|
del user["hashedPassword"]
|
|
|
|
return filteredUsers
|
|
|
|
def getUsersByMandate(self, mandateId: int) -> List[Dict[str, Any]]:
|
|
"""Returns users for a specific mandate if user has access."""
|
|
# First check if user has access to the mandate
|
|
mandate = self.getMandate(mandateId)
|
|
if not mandate:
|
|
return []
|
|
|
|
# Get users for this mandate
|
|
users = self.db.getRecordset("users", recordFilter={"mandateId": mandateId})
|
|
filteredUsers = self._uam("users", users)
|
|
|
|
# Remove password hashes
|
|
for user in filteredUsers:
|
|
if "hashedPassword" in user:
|
|
del user["hashedPassword"]
|
|
|
|
return filteredUsers
|
|
|
|
def getUserByUsername(self, username: str) -> Optional[Dict[str, Any]]:
|
|
"""Returns a user by username."""
|
|
users = self.db.getRecordset("users")
|
|
for user in users:
|
|
if user.get("username") == username:
|
|
return user
|
|
return None
|
|
|
|
def getUser(self, userId: int) -> Optional[Dict[str, Any]]:
|
|
"""Returns a user by ID if user has access."""
|
|
users = self.db.getRecordset("users", recordFilter={"id": userId})
|
|
if not users:
|
|
return None
|
|
|
|
filteredUsers = self._uam("users", users)
|
|
if not filteredUsers:
|
|
return None
|
|
|
|
user = filteredUsers[0]
|
|
|
|
# Remove password hash
|
|
if "hashedPassword" in user:
|
|
userCopy = user.copy()
|
|
del userCopy["hashedPassword"]
|
|
return userCopy
|
|
|
|
return user
|
|
|
|
def createUser(self, username: str, password: str, email: str = None,
|
|
fullName: str = None, language: str = "de", mandateId: int = None,
|
|
disabled: bool = False, privilege: str = "user") -> Dict[str, Any]:
|
|
"""Creates a new user if current user has permission."""
|
|
# Check if the username already exists
|
|
existingUser = self.getUserByUsername(username)
|
|
if existingUser:
|
|
raise ValueError(f"User '{username}' already exists")
|
|
|
|
# Use the provided mandateId or the current context
|
|
userMandateId = mandateId if mandateId is not None else self.mandateId
|
|
|
|
# Check if user has access to the mandate
|
|
if userMandateId != self.mandateId and self.currentUser.get("privilege") != "sysadmin":
|
|
raise PermissionError(f"No permission to create users in mandate {userMandateId}")
|
|
|
|
if not self._canModify("users"):
|
|
raise PermissionError("No permission to create users")
|
|
|
|
# Check privilege escalation
|
|
if (privilege == "sysadmin" or
|
|
(privilege == "admin" and self.currentUser.get("privilege") == "user")):
|
|
raise PermissionError(f"Cannot create user with higher privilege: {privilege}")
|
|
|
|
userData = {
|
|
"mandateId": userMandateId,
|
|
"username": username,
|
|
"email": email,
|
|
"fullName": fullName,
|
|
"disabled": disabled,
|
|
"language": language,
|
|
"privilege": privilege,
|
|
"hashedPassword": self._getPasswordHash(password)
|
|
}
|
|
|
|
createdUser = self.db.recordCreate("users", userData)
|
|
|
|
# Remove password hash from the response
|
|
if "hashedPassword" in createdUser:
|
|
del createdUser["hashedPassword"]
|
|
|
|
return createdUser
|
|
|
|
def authenticateUser(self, username: str, password: str) -> Optional[Dict[str, Any]]:
|
|
"""Authenticates a user by username and password."""
|
|
# Clear the users table from cache and reload it
|
|
if "users" in self.db._tablesCache:
|
|
del self.db._tablesCache["users"]
|
|
|
|
# Get fresh user data
|
|
users = self.db.getRecordset("users")
|
|
user = next((u for u in users if u.get("username") == username), None)
|
|
|
|
if not user:
|
|
raise ValueError("Benutzer nicht gefunden")
|
|
|
|
if not self._verifyPassword(password, user.get("hashedPassword", "")):
|
|
raise ValueError("Falsches Passwort")
|
|
|
|
# Check if the user is disabled
|
|
if user.get("disabled", False):
|
|
raise ValueError("Benutzer ist deaktiviert")
|
|
|
|
# Create a copy without password hash
|
|
authenticatedUser = {**user}
|
|
if "hashedPassword" in authenticatedUser:
|
|
del authenticatedUser["hashedPassword"]
|
|
|
|
return authenticatedUser
|
|
|
|
|
|
def updateUser(self, userId: int, userData: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Updates a user if current user has permission."""
|
|
# Check if the user exists and current user has access
|
|
user = self.getUser(userId)
|
|
if not user:
|
|
# Try to get the raw user record for admin access check
|
|
users = self.db.getRecordset("users", recordFilter={"id": userId})
|
|
if not users:
|
|
raise ValueError(f"User with ID {userId} not found")
|
|
|
|
# Check if current user is admin/sysadmin
|
|
if not self._canModify("users", userId):
|
|
raise PermissionError(f"No permission to update user {userId}")
|
|
|
|
user = users[0]
|
|
|
|
# Check privilege escalation
|
|
if "privilege" in userData:
|
|
currentPrivilege = self.currentUser.get("privilege")
|
|
targetPrivilege = userData["privilege"]
|
|
|
|
if (targetPrivilege == "sysadmin" and currentPrivilege != "sysadmin") or (
|
|
targetPrivilege == "admin" and currentPrivilege == "user"):
|
|
raise PermissionError(f"Cannot escalate privilege to {targetPrivilege}")
|
|
|
|
# If the password is being changed, hash it
|
|
if "password" in userData:
|
|
userData["hashedPassword"] = self._getPasswordHash(userData["password"])
|
|
del userData["password"]
|
|
|
|
# Update the user
|
|
updatedUser = self.db.recordModify("users", userId, userData)
|
|
|
|
# Remove password hash from the response
|
|
if "hashedPassword" in updatedUser:
|
|
del updatedUser["hashedPassword"]
|
|
|
|
return updatedUser
|
|
|
|
def disableUser(self, userId: int) -> Dict[str, Any]:
|
|
"""Disables a user if current user has permission."""
|
|
return self.updateUser(userId, {"disabled": True})
|
|
|
|
def enableUser(self, userId: int) -> Dict[str, Any]:
|
|
"""Enables a user if current user has permission."""
|
|
return self.updateUser(userId, {"disabled": False})
|
|
|
|
def _deleteUserReferencedData(self, userId: int) -> None:
|
|
"""Deletes all data associated with a user."""
|
|
# Delete user attributes
|
|
try:
|
|
attributes = self.db.getRecordset("attributes", recordFilter={"userId": userId})
|
|
for attribute in attributes:
|
|
self.db.recordDelete("attributes", attribute["id"])
|
|
except Exception as e:
|
|
logger.error(f"Error deleting attributes for user {userId}: {e}")
|
|
|
|
logger.info(f"All referenced data for user {userId} has been deleted")
|
|
|
|
def deleteUser(self, userId: int) -> bool:
|
|
"""Deletes a user and all associated data if current user has permission."""
|
|
# Check if the user exists
|
|
users = self.db.getRecordset("users", recordFilter={"id": userId})
|
|
if not users:
|
|
return False
|
|
|
|
# Check if current user has permission
|
|
if not self._canModify("users", userId):
|
|
raise PermissionError(f"No permission to delete user {userId}")
|
|
|
|
# Check if it's the initial user
|
|
initialUserId = self.getInitialId("users")
|
|
if initialUserId is not None and userId == initialUserId:
|
|
logger.warning("Attempt to delete the Root Admin was prevented")
|
|
return False
|
|
|
|
# Delete all data associated with the user
|
|
self._deleteUserReferencedData(userId)
|
|
|
|
# Delete the user
|
|
success = self.db.recordDelete("users", userId)
|
|
|
|
if success:
|
|
logger.info(f"User with ID {userId} was successfully deleted")
|
|
else:
|
|
logger.error(f"Error deleting user with ID {userId}")
|
|
|
|
return success
|
|
|
|
|
|
# Singleton factory for GatewayInterface instances per context
|
|
_gatewayInterfaces = {}
|
|
|
|
def getGatewayInterface(mandateId: int = None, userId: int = None) -> GatewayInterface:
|
|
"""
|
|
Returns a GatewayInterface instance for the specified context.
|
|
Reuses existing instances.
|
|
"""
|
|
contextKey = f"{mandateId}_{userId}"
|
|
if contextKey not in _gatewayInterfaces:
|
|
_gatewayInterfaces[contextKey] = GatewayInterface(mandateId, userId)
|
|
return _gatewayInterfaces[contextKey]
|
|
|
|
# Initialize an instance
|
|
getGatewayInterface() |