gateway/modules/interfaces/gatewayInterface.py

517 lines
No EOL
20 KiB
Python

"""
Interface to the Gateway system.
Manages users and mandates for authentication.
"""
import os
import logging
from typing import Dict, Any, List, Optional, Union
import importlib
from passlib.context import CryptContext
from modules.connectors.connectorDbJson import DatabaseConnector
from modules.shared.configuration import APP_CONFIG
from modules.interfaces.gatewayAccess import _uam, _canModify
logger = logging.getLogger(__name__)
# Password-Hashing
pwdContext = CryptContext(schemes=["argon2"], deprecated="auto")
class GatewayInterface:
"""
Interface to the Gateway system.
Manages users and mandates.
"""
def __init__(self, _mandateId: str = None, _userId: str = None):
"""Initializes the Gateway Interface with optional mandate and user context."""
# Context can be empty during initialization
self._mandateId = _mandateId
self._userId = _userId
# Initialize database
self._initializeDatabase()
# Load user information
self.currentUser = self._getCurrentUserInfo()
# Initialize standard records if needed
self._initRecords()
def _initializeDatabase(self):
"""Initializes the database connection."""
# Get configuration values with defaults
dbHost = APP_CONFIG.get("DB_GATEWAY_HOST", "data")
dbDatabase = APP_CONFIG.get("DB_GATEWAY_DATABASE", "gateway")
dbUser = APP_CONFIG.get("DB_GATEWAY_USER")
dbPassword = APP_CONFIG.get("DB_GATEWAY_PASSWORD_SECRET")
# Ensure the database directory exists
os.makedirs(dbHost, exist_ok=True)
self.db = DatabaseConnector(
dbHost=dbHost,
dbDatabase=dbDatabase,
dbUser=dbUser,
dbPassword=dbPassword,
_mandateId=self._mandateId,
_userId=self._userId
)
def _getCurrentUserInfo(self) -> Optional[Dict[str, Any]]:
"""Returns information about the current user."""
if not self._userId:
return None
users = self.db.getRecordset("users", recordFilter={"id": self._userId})
if users:
return users[0]
return None
def _initRecords(self):
"""Initializes standard records in the database if they don't exist."""
self._initRootMandate()
self._initAdminUser()
# Update database context with new IDs
if self._mandateId and self._userId:
self.db.updateContext(self._mandateId, self._userId)
# Reload user information with new context
self.currentUser = self._getCurrentUserInfo()
def _initRootMandate(self):
"""Creates the Root mandate if it doesn't exist."""
existingMandateId = self.getInitialId("mandates")
mandates = self.db.getRecordset("mandates")
if existingMandateId is None or not mandates:
logger.info("Creating Root mandate")
rootMandate = {
"name": "Root",
"language": "de"
}
createdMandate = self.db.recordCreate("mandates", rootMandate)
logger.info(f"Root mandate created with ID {createdMandate['id']}")
# Register the initial ID
self.db._registerInitialId("mandates", createdMandate['id'])
# Update mandate context
self._mandateId = createdMandate['id']
def _initAdminUser(self):
"""Creates the Admin user if it doesn't exist."""
existingUserId = self.getInitialId("users")
users = self.db.getRecordset("users")
if existingUserId is None or not users:
logger.info("Creating Admin user")
adminUser = {
"_mandateId": self._mandateId,
"username": "admin",
"email": "admin@example.com",
"fullName": "Administrator",
"disabled": False,
"language": "de",
"privilege": "sysadmin",
"hashedPassword": self._getPasswordHash("The 1st Poweron Admin") # Use a secure password in production!
}
createdUser = self.db.recordCreate("users", adminUser)
logger.info(f"Admin user created with ID {createdUser['id']}")
# Register the initial ID
self.db._registerInitialId("users", createdUser['id'])
# Update user context
self._userId = createdUser['id']
def _uam(self, table: str, recordset: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Unified user access management function that filters data based on user privileges
and adds access control attributes.
Args:
table: Name of the table
recordset: Recordset to filter based on access rules
Returns:
Filtered recordset with access control attributes
"""
return _uam(self.currentUser, table, recordset, self._mandateId, self._userId, self.db)
def _canModify(self, table: str, recordId: Optional[str] = None) -> bool:
"""
Checks if the current user can modify (create/update/delete) records in a table.
Args:
table: Name of the table
recordId: Optional record ID for specific record check
Returns:
Boolean indicating permission
"""
return _canModify(self.currentUser, table, recordId, self._mandateId, self._userId, self.db)
def getInitialId(self, table: str) -> Optional[str]:
"""Returns the initial ID for a table."""
return self.db.getInitialId(table)
def _getPasswordHash(self, password: str) -> str:
"""Creates a hash for a password."""
return pwdContext.hash(password)
def _verifyPassword(self, plainPassword: str, hashedPassword: str) -> bool:
"""Checks if the password matches the hash."""
return pwdContext.verify(plainPassword, hashedPassword)
def _getCurrentTimestamp(self) -> str:
"""Returns the current timestamp in ISO format."""
from datetime import datetime
return datetime.now().isoformat()
# Mandate methods
def getAllMandates(self) -> List[Dict[str, Any]]:
"""Returns mandates based on user access level."""
allMandates = self.db.getRecordset("mandates")
return self._uam("mandates", allMandates)
def getMandate(self, mandateId: str) -> Optional[Dict[str, Any]]:
"""Returns a mandate by ID if user has access."""
mandates = self.db.getRecordset("mandates", recordFilter={"id": mandateId})
if not mandates:
return None
filteredMandates = self._uam("mandates", mandates)
return filteredMandates[0] if filteredMandates else None
def createMandate(self, name: str, language: str = "de") -> Dict[str, Any]:
"""Creates a new mandate if user has permission."""
if not self._canModify("mandates"):
raise PermissionError("No permission to create mandates")
mandateData = {
"name": name,
"language": language
}
return self.db.recordCreate("mandates", mandateData)
def updateMandate(self, mandateId: str, mandateData: Dict[str, Any]) -> Dict[str, Any]:
"""Updates a mandate if user has access."""
# Check if the mandate exists and user has access
mandate = self.getMandate(mandateId)
if not mandate:
raise ValueError(f"Mandate with ID {mandateId} not found")
if not self._canModify("mandates", mandateId):
raise PermissionError(f"No permission to update mandate {mandateId}")
# Update the mandate
return self.db.recordModify("mandates", mandateId, mandateData)
def deleteMandate(self, mandateId: str) -> bool:
"""
Deletes a mandate and all associated users and data if user has permission.
"""
# Check if the mandate exists and user has access
mandate = self.getMandate(mandateId)
if not mandate:
return False
if not self._canModify("mandates", mandateId):
raise PermissionError(f"No permission to delete mandate {mandateId}")
# Check if it's the initial mandate
initialMandateId = self.getInitialId("mandates")
if initialMandateId is not None and mandateId == initialMandateId:
logger.warning(f"Attempt to delete the Root mandate was prevented")
return False
# Find all users of the mandate
users = self.getUsersByMandate(mandateId)
# Delete all users of the mandate and their associated data
for user in users:
self.deleteUser(user["id"])
# Delete the mandate
success = self.db.recordDelete("mandates", mandateId)
if success:
logger.info(f"Mandate with ID {mandateId} was successfully deleted")
else:
logger.error(f"Error deleting mandate with ID {mandateId}")
return success
# User methods
def getAllUsers(self) -> List[Dict[str, Any]]:
"""Returns users based on user access level."""
allUsers = self.db.getRecordset("users")
filteredUsers = self._uam("users", allUsers)
# Remove password hashes
for user in filteredUsers:
if "hashedPassword" in user:
del user["hashedPassword"]
return filteredUsers
def getUsersByMandate(self, _mandateId: str) -> List[Dict[str, Any]]:
"""Returns users for a specific mandate if user has access."""
# First check if user has access to the mandate
mandate = self.getMandate(_mandateId)
if not mandate:
return []
# Get users for this mandate
users = self.db.getRecordset("users", recordFilter={"_mandateId": _mandateId})
filteredUsers = self._uam("users", users)
# Remove password hashes
for user in filteredUsers:
if "hashedPassword" in user:
del user["hashedPassword"]
return filteredUsers
def getUserByUsername(self, username: str) -> Optional[Dict[str, Any]]:
"""Returns a user by username."""
# Get all users without mandate filter
users = self.db.getRecordset("users")
for user in users:
if user.get("username") == username:
# Log the fields present in the user record
logger.debug(f"Found user {username} with fields: {list(user.keys())}")
# Return a complete copy of the user record with all fields
return {**user} # Use dict unpacking to ensure we get a complete copy with all fields
logger.debug(f"No user found with username {username}")
return None
def getUser(self, _userId: str) -> Optional[Dict[str, Any]]:
"""Returns a user by ID if user has access."""
users = self.db.getRecordset("users", recordFilter={"_userId": _userId})
if not users:
return None
filteredUsers = self._uam("users", users)
if not filteredUsers:
return None
user = filteredUsers[0]
# Remove password hash
if "hashedPassword" in user:
userCopy = user.copy()
del userCopy["hashedPassword"]
return userCopy
return user
def createUser(self, username: str, password: str, email: str = None,
fullName: str = None, language: str = "de", _mandateId: str = None,
disabled: bool = False, privilege: str = "user") -> Dict[str, Any]:
"""Creates a new user if current user has permission."""
# Validate username
if not username or len(username) < 3:
raise ValueError("Benutzername muss mindestens 3 Zeichen lang sein")
# Validate password
if not password:
raise ValueError("Passwort ist erforderlich")
# Password requirements
if len(password) < 8:
raise ValueError("Passwort muss mindestens 8 Zeichen lang sein")
if not any(c.isupper() for c in password):
raise ValueError("Passwort muss mindestens einen Grossbuchstaben enthalten")
if not any(c.islower() for c in password):
raise ValueError("Passwort muss mindestens einen Kleinbuchstaben enthalten")
if not any(c.isdigit() for c in password):
raise ValueError("Passwort muss mindestens eine Zahl enthalten")
if not any(c in "!@#$%^&*(),.?\":{}|<>" for c in password):
raise ValueError("Passwort muss mindestens ein Sonderzeichen enthalten")
# Validate email if provided
if email:
import re
email_pattern = r'^[^\s@]+@[^\s@]+\.[^\s@]+$'
if not re.match(email_pattern, email):
raise ValueError("Ungültiges E-Mail-Format")
# Check if the username already exists
existingUser = self.getUserByUsername(username)
if existingUser:
raise ValueError(f"Benutzer '{username}' existiert bereits")
# Use the provided _mandateId or the current context
userMandateId = _mandateId if _mandateId is not None else self._mandateId
# Check if user has access to the mandate
if userMandateId != self._mandateId and self.currentUser.get("privilege") != "sysadmin":
raise PermissionError(f"Keine Berechtigung, Benutzer in Mandat {userMandateId} zu erstellen")
if not self._canModify("users"):
raise PermissionError("Keine Berechtigung, Benutzer zu erstellen")
# Check privilege escalation
if (privilege == "sysadmin" or
(privilege == "admin" and self.currentUser.get("privilege") == "user")):
raise PermissionError(f"Keine Berechtigung, Benutzer mit höherem Privileg zu erstellen: {privilege}")
userData = {
"_mandateId": userMandateId,
"username": username,
"email": email,
"fullName": fullName,
"disabled": disabled,
"language": language,
"privilege": privilege,
"hashedPassword": self._getPasswordHash(password)
}
createdUser = self.db.recordCreate("users", userData)
# Clear the users table from cache to ensure fresh data
if "users" in self.db._tablesCache:
del self.db._tablesCache["users"]
# Return the complete user record
return createdUser
def authenticateUser(self, username: str, password: str) -> Optional[Dict[str, Any]]:
"""Authenticates a user by username and password."""
# Clear the users table from cache and reload it
if "users" in self.db._tablesCache:
del self.db._tablesCache["users"]
# Get user by username
user = self.getUserByUsername(username)
if not user:
raise ValueError("Benutzer nicht gefunden")
if not self._verifyPassword(password, user.get("hashedPassword", "")):
raise ValueError("Falsches Passwort")
# Check if the user is disabled
if user.get("disabled", False):
raise ValueError("Benutzer ist deaktiviert")
# Create a copy without password hash
authenticatedUser = {**user}
if "hashedPassword" in authenticatedUser:
del authenticatedUser["hashedPassword"]
return authenticatedUser
def updateUser(self, _userId: str, userData: Dict[str, Any]) -> Dict[str, Any]:
"""Updates a user if current user has permission."""
# Check if the user exists and current user has access
user = self.getUser(_userId)
if not user:
# Try to get the raw user record for admin access check
users = self.db.getRecordset("users", recordFilter={"_userId": _userId})
if not users:
raise ValueError(f"User with ID {_userId} not found")
# Check if current user is admin/sysadmin
if not self._canModify("users", _userId):
raise PermissionError(f"No permission to update user {_userId}")
user = users[0]
# Check privilege escalation
if "privilege" in userData:
currentPrivilege = self.currentUser.get("privilege")
targetPrivilege = userData["privilege"]
if (targetPrivilege == "sysadmin" and currentPrivilege != "sysadmin") or (
targetPrivilege == "admin" and currentPrivilege == "user"):
raise PermissionError(f"Cannot escalate privilege to {targetPrivilege}")
# If the password is being changed, hash it
if "password" in userData:
userData["hashedPassword"] = self._getPasswordHash(userData["password"])
del userData["password"]
# Update the user
updatedUser = self.db.recordModify("users", _userId, userData)
# Remove password hash from the response
if "hashedPassword" in updatedUser:
del updatedUser["hashedPassword"]
return updatedUser
def disableUser(self, _userId: str) -> Dict[str, Any]:
"""Disables a user if current user has permission."""
return self.updateUser(_userId, {"disabled": True})
def enableUser(self, _userId: str) -> Dict[str, Any]:
"""Enables a user if current user has permission."""
return self.updateUser(_userId, {"disabled": False})
def _deleteUserReferencedData(self, _userId: str) -> None:
"""Deletes all data associated with a user."""
# Delete user attributes
try:
attributes = self.db.getRecordset("attributes", recordFilter={"_userId": _userId})
for attribute in attributes:
self.db.recordDelete("attributes", attribute["id"])
except Exception as e:
logger.error(f"Error deleting attributes for user {_userId}: {e}")
logger.info(f"All referenced data for user {_userId} has been deleted")
def deleteUser(self, _userId: str) -> bool:
"""Deletes a user and all associated data if current user has permission."""
# Check if the user exists
users = self.db.getRecordset("users", recordFilter={"_userId": _userId})
if not users:
return False
# Check if current user has permission
if not self._canModify("users", _userId):
raise PermissionError(f"No permission to delete user {_userId}")
# Check if it's the initial user
initialUserId = self.getInitialId("users")
if initialUserId is not None and _userId == initialUserId:
logger.warning("Attempt to delete the Root Admin was prevented")
return False
# Delete all data associated with the user
self._deleteUserReferencedData(_userId)
# Delete the user
success = self.db.recordDelete("users", _userId)
if success:
logger.info(f"User with ID {_userId} was successfully deleted")
else:
logger.error(f"Error deleting user with ID {_userId}")
return success
# Singleton factory for GatewayInterface instances per context
_gatewayInterfaces = {}
def getGatewayInterface(_mandateId: str = None, _userId: str = None) -> GatewayInterface:
"""
Returns a GatewayInterface instance for the specified context.
Reuses existing instances.
"""
# For initialization, use empty strings instead of None
contextKey = f"{_mandateId or ''}_{_userId or ''}"
if contextKey not in _gatewayInterfaces:
_gatewayInterfaces[contextKey] = GatewayInterface(_mandateId or '', _userId or '')
return _gatewayInterfaces[contextKey]
# Initialize an instance with empty strings
getGatewayInterface('', '')