gateway/modules/interfaces/gatewayInterface.py
2025-05-21 19:38:06 +02:00

545 lines
21 KiB
Python

"""
Interface to the Gateway system.
Manages users and mandates for authentication.
"""
from datetime import datetime
import os
import logging
from typing import Dict, Any, List, Optional, Union
import importlib
import json
from passlib.context import CryptContext
from modules.connectors.connectorDbJson import DatabaseConnector
from modules.shared.configuration import APP_CONFIG
from modules.interfaces.gatewayAccess import GatewayAccess
from modules.interfaces.gatewayModel import User, Mandate, UserInDB
logger = logging.getLogger(__name__)
# Singleton factory for GatewayInterface instances per context
_gatewayInterfaces = {}
# Password-Hashing
pwdContext = CryptContext(schemes=["argon2"], deprecated="auto")
class GatewayInterface:
"""
Interface to the Gateway system.
Manages users and mandates.
"""
def __init__(self):
"""Initializes the Gateway Interface."""
# Initialize database
self._initializeDatabase()
# Initialize standard records if needed
self._initRecords()
# Initialize variables
self.currentUser = None
self.userId = None
self.access = None # Will be set when user context is provided
def setUserContext(self, currentUser: Dict[str, Any]):
"""Sets the user context for the interface."""
if not currentUser:
logger.info("Initializing interface without user context")
return
self.currentUser = currentUser
self.userId = currentUser.get("id")
if not self.userId:
raise ValueError("Invalid user context: id is required")
# Add language settings
self.userLanguage = currentUser.get("language", "en") # Default user language
# Initialize access control with user context
self.access = GatewayAccess(self.currentUser, self.db)
logger.debug(f"User context set: userId={self.userId}")
def _initializeDatabase(self):
"""Initializes the database connection."""
try:
# Get configuration values with defaults
dbHost = APP_CONFIG.get("DB_GATEWAY_HOST", "data")
dbDatabase = APP_CONFIG.get("DB_GATEWAY_DATABASE", "gateway")
dbUser = APP_CONFIG.get("DB_GATEWAY_USER")
dbPassword = APP_CONFIG.get("DB_GATEWAY_PASSWORD_SECRET")
# Ensure the database directory exists
os.makedirs(dbHost, exist_ok=True)
self.db = DatabaseConnector(
dbHost=dbHost,
dbDatabase=dbDatabase,
dbUser=dbUser,
dbPassword=dbPassword
)
logger.info("Database initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize database: {str(e)}")
raise
def _initRecords(self):
self._initRootMandate()
self._initAdminUser()
def _initRootMandate(self):
"""Creates the Root mandate if it doesn't exist."""
existingMandateId = self.getInitialId("mandates")
mandates = self.db.getRecordset("mandates")
if existingMandateId is None or not mandates:
logger.info("Creating Root mandate")
rootMandate = {
"name": "Root",
"language": "en"
}
createdMandate = self.db.recordCreate("mandates", rootMandate)
logger.info(f"Root mandate created with ID {createdMandate['id']}")
# Register the initial ID
self.db._registerInitialId("mandates", createdMandate['id'])
# Update mandate context
self.currentUser["mandateId"] = createdMandate['id']
def _initAdminUser(self):
"""Creates the Admin user if it doesn't exist."""
existingUserId = self.getInitialId("users")
users = self.db.getRecordset("users")
if existingUserId is None or not users:
logger.info("Creating Admin user")
adminUser = {
"mandateId": self.getInitialId("mandates"),
"username": "admin",
"email": "admin@example.com",
"fullName": "Administrator",
"disabled": False,
"language": "en",
"privilege": "sysadmin",
"authenticationAuthority": "local",
"hashedPassword": self._getPasswordHash("The 1st Poweron Admin") # Use a secure password in production!
}
createdUser = self.db.recordCreate("users", adminUser)
logger.info(f"Admin user created with ID {createdUser['id']}")
# Register the initial ID
self.db._registerInitialId("users", createdUser['id'])
# Update user context
self.currentUser = createdUser
self.userId = createdUser.get("id")
def _uam(self, table: str, recordset: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Unified user access management function that filters data based on user privileges
and adds access control attributes.
Args:
table: Name of the table
recordset: Recordset to filter based on access rules
Returns:
Filtered recordset with access control attributes
"""
return self.access.uam(table, recordset)
def _canModify(self, table: str, recordId: Optional[str] = None) -> bool:
"""
Checks if the current user can modify (create/update/delete) records in a table.
Args:
table: Name of the table
recordId: Optional record ID for specific record check
Returns:
Boolean indicating permission
"""
return self.access.canModify(table, recordId)
def getInitialId(self, table: str) -> Optional[str]:
"""Returns the initial ID for a table."""
return self.db.getInitialId(table)
def _getPasswordHash(self, password: str) -> str:
"""Creates a hash for a password."""
return pwdContext.hash(password)
def _verifyPassword(self, plainPassword: str, hashedPassword: str) -> bool:
"""Checks if the password matches the hash."""
return pwdContext.verify(plainPassword, hashedPassword)
# Mandate methods
def getAllMandates(self) -> List[Mandate]:
"""Returns mandates based on user access level."""
allMandates = self.db.getRecordset("mandates")
filteredMandates = self._uam("mandates", allMandates)
return [Mandate(**mandate) for mandate in filteredMandates]
def getMandate(self, mandateId: str) -> Optional[Mandate]:
"""Returns a mandate by ID if user has access."""
mandates = self.db.getRecordset("mandates", recordFilter={"id": mandateId})
if not mandates:
return None
filteredMandates = self._uam("mandates", mandates)
if not filteredMandates:
return None
return Mandate(**filteredMandates[0])
def createMandate(self, name: str, language: str = "en") -> Mandate:
"""Creates a new mandate if user has permission."""
if not self._canModify("mandates"):
raise PermissionError("No permission to create mandates")
# Create and validate mandate data using Pydantic model
mandateData = Mandate(
name=name,
language=language
)
# Convert to dict for database storage
created = self.db.recordCreate("mandates", mandateData.model_dump())
return Mandate(**created)
def updateMandate(self, mandateId: str, mandateData: Dict[str, Any]) -> Mandate:
"""Updates a mandate if user has access."""
# Check if the mandate exists and user has access
mandate = self.getMandate(mandateId)
if not mandate:
raise ValueError(f"Mandate with ID {mandateId} not found")
if not self._canModify("mandates", mandateId):
raise PermissionError(f"No permission to update mandate {mandateId}")
# Validate update data using Pydantic model
try:
# Create a new Mandate instance with existing data plus updates
updatedMandate = Mandate(**{**mandate.model_dump(), **mandateData})
except Exception as e:
raise ValueError(f"Invalid mandate data: {str(e)}")
# Update the mandate
updated = self.db.recordModify("mandates", mandateId, updatedMandate.model_dump())
return Mandate(**updated)
def deleteMandate(self, mandateId: str) -> bool:
"""
Deletes a mandate and all associated users and data if user has permission.
"""
# Check if the mandate exists and user has access
mandate = self.getMandate(mandateId)
if not mandate:
return False
if not self._canModify("mandates", mandateId):
raise PermissionError(f"No permission to delete mandate {mandateId}")
# Check if it's the initial mandate
initialMandateId = self.getInitialId("mandates")
if initialMandateId is not None and mandateId == initialMandateId:
logger.warning(f"Attempt to delete the Root mandate was prevented")
return False
# Find all users of the mandate
users = self.getUsersByMandate(mandateId)
# Delete all users of the mandate and their associated data
for user in users:
self.deleteUser(user["id"])
# Delete the mandate
success = self.db.recordDelete("mandates", mandateId)
if success:
logger.info(f"Mandate with ID {mandateId} was successfully deleted")
else:
logger.error(f"Error deleting mandate with ID {mandateId}")
return success
# User methods
def getAllUsers(self) -> List[User]:
"""Returns users based on user access level."""
allUsers = self.db.getRecordset("users")
filteredUsers = self._uam("users", allUsers)
# Remove password hashes
for user in filteredUsers:
if "hashedPassword" in user:
del user["hashedPassword"]
return [User(**user) for user in filteredUsers]
def getUsersByMandate(self, mandateId: str) -> List[User]:
"""Returns users for a specific mandate if user has access."""
# Get users for this mandate
users = self.db.getRecordset("users", recordFilter={"mandateId": mandateId})
filteredUsers = self._uam("users", users)
# Remove password hashes
for user in filteredUsers:
if "hashedPassword" in user:
del user["hashedPassword"]
return [User(**user) for user in filteredUsers]
def getUserByUsername(self, username: str) -> Optional[User]:
"""Returns a user by username."""
try:
# Get users table
users = self.db.getRecordset("users")
if not users:
return None
# Find user by username
for user in users:
if user.get("username") == username:
logger.info(f"Found user with username {username}")
logger.debug(f"User fields: {list(user.keys())}")
return User(**user)
logger.info(f"No user found with username {username}")
return None
except Exception as e:
logger.error(f"Error getting user by username: {str(e)}")
return None
def getUser(self, userId: str) -> Optional[User]:
"""Returns a user by ID if user has access."""
users = self.db.getRecordset("users", recordFilter={"id": userId})
if not users:
return None
filteredUsers = self._uam("users", users)
if not filteredUsers:
return None
user = filteredUsers[0]
# Remove password hash
if "hashedPassword" in user:
userCopy = user.copy()
del userCopy["hashedPassword"]
return User(**userCopy)
return User(**user)
def createUser(self, username: str, password: str = None, email: str = None, fullName: str = None,
language: str = "en", disabled: bool = False,
privilege: str = "user", authenticationAuthority: str = "local") -> User:
"""Create a new user"""
try:
# Validate username
if not username:
raise ValueError("Username is required")
# Check if user already exists with the same authentication authority
existingUser = self.getUserByUsername(username)
if existingUser and existingUser.authenticationAuthority == authenticationAuthority:
raise ValueError(f"Username '{username}' already exists with {authenticationAuthority} authentication")
# Validate password for local authentication
if authenticationAuthority == "local":
if not password:
raise ValueError("Password is required for local authentication")
if len(password) < 8:
raise ValueError("Password must be at least 8 characters long")
# Create user data using UserInDB model
userData = UserInDB(
username=username,
email=email,
fullName=fullName,
language=language,
mandateId=self.currentUser.get("mandateId"),
disabled=disabled,
privilege=privilege,
authenticationAuthority=authenticationAuthority,
hashedPassword=self._getPasswordHash(password) if authenticationAuthority == "local" else None
)
# Create user record
createdRecord = self.db.recordCreate("users", userData.model_dump(exclude_none=True))
if not createdRecord or not createdRecord.get("id"):
raise ValueError("Failed to create user record")
# Get created user using the returned ID
createdUser = self.db.getRecordset("users", recordFilter={"id": createdRecord["id"]})
if not createdUser or len(createdUser) == 0:
# Try to get user by username as fallback
createdUser = self.db.getRecordset("users", recordFilter={"username": userData.username})
if not createdUser or len(createdUser) == 0:
raise ValueError("Failed to retrieve created user")
# Clear users table from cache
if hasattr(self.db, '_tablesCache') and "users" in self.db._tablesCache:
del self.db._tablesCache["users"]
return User(**createdUser[0])
except ValueError as e:
logger.error(f"Error creating user: {str(e)}")
raise
except Exception as e:
logger.error(f"Unexpected error creating user: {str(e)}")
raise ValueError(f"Failed to create user: {str(e)}")
def authenticateUser(self, username: str, password: str = None) -> Optional[User]:
"""Authenticates a user by username and password."""
# Clear the users table from cache and reload it
if "users" in self.db._tablesCache:
del self.db._tablesCache["users"]
# Get user by username
user = self.getUserByUsername(username)
if not user:
raise ValueError("Benutzer nicht gefunden")
# Check if the user is disabled
if user.disabled:
raise ValueError("Benutzer ist deaktiviert")
# Handle authentication based on authority
auth_authority = user.authenticationAuthority
if auth_authority == "local":
if not password:
raise ValueError("Passwort ist erforderlich")
# Get the full user record with password hash for verification
userWithPassword = UserInDB(**self.db.getRecordset("users", recordFilter={"id": user.id})[0])
if not self._verifyPassword(password, userWithPassword.hashedPassword):
raise ValueError("Falsches Passwort")
elif auth_authority == "microsoft":
# For Microsoft users, we don't verify the password here
# The authentication is handled by the Microsoft OAuth flow
pass
else:
raise ValueError(f"Unbekannte Authentifizierungsmethode: {auth_authority}")
return user
def updateUser(self, userId: str, userData: Dict[str, Any]) -> User:
"""Updates a user if current user has permission."""
# Check if the user exists and current user has access
user = self.getUser(userId)
if not user:
# Try to get the raw user record for admin access check
users = self.db.getRecordset("users", recordFilter={"id": userId})
if not users:
raise ValueError(f"User with ID {userId} not found")
# Check if current user is admin/sysadmin
if not self._canModify("users", userId):
raise PermissionError(f"No permission to update user {userId}")
user = users[0]
# Check privilege escalation
if "privilege" in userData:
currentPrivilege = self.currentUser.get("privilege")
targetPrivilege = userData["privilege"]
if (targetPrivilege == "sysadmin" and currentPrivilege != "sysadmin") or (
targetPrivilege == "admin" and currentPrivilege == "user"):
raise PermissionError(f"Cannot escalate privilege to {targetPrivilege}")
# If the password is being changed, hash it
if "password" in userData:
userData["hashedPassword"] = self._getPasswordHash(userData["password"])
del userData["password"]
try:
# Create a new UserInDB instance with existing data plus updates
updatedUser = UserInDB(**{**user.model_dump(), **userData})
except Exception as e:
raise ValueError(f"Invalid user data: {str(e)}")
# Update the user
updated = self.db.recordModify("users", userId, updatedUser.model_dump(exclude_none=True))
# Return User model without password hash
return User(**updated)
def disableUser(self, userId: str) -> User:
"""Disables a user if current user has permission."""
return self.updateUser(userId, {"disabled": True})
def enableUser(self, userId: str) -> User:
"""Enables a user if current user has permission."""
return self.updateUser(userId, {"disabled": False})
def _deleteUserReferencedData(self, userId: str) -> None:
"""Deletes all data associated with a user."""
# Delete user attributes
try:
attributes = self.db.getRecordset("attributes", recordFilter={"createdBy": userId})
for attribute in attributes:
self.db.recordDelete("attributes", attribute["id"])
except Exception as e:
logger.error(f"Error deleting attributes for user {userId}: {e}")
logger.info(f"All referenced data for user {userId} has been deleted")
def deleteUser(self, userId: str) -> bool:
"""Deletes a user and all associated data if current user has permission."""
# Check if the user exists
users = self.db.getRecordset("users", recordFilter={"id": userId})
if not users:
return False
# Check if current user has permission
if not self._canModify("users", userId):
raise PermissionError(f"No permission to delete user {userId}")
# Check if it's the initial user
initialUserId = self.getInitialId("users")
if initialUserId is not None and userId == initialUserId:
logger.warning("Attempt to delete the Root Admin was prevented")
return False
# Delete all data associated with the user
self._deleteUserReferencedData(userId)
# Delete the user
success = self.db.recordDelete("users", userId)
if success:
logger.info(f"User with ID {userId} was successfully deleted")
else:
logger.error(f"Error deleting user with ID {userId}")
return success
def getInterface(currentUser: Dict[str, Any] = None) -> 'GatewayInterface':
"""
Returns a GatewayInterface instance.
If currentUser is provided, initializes with user context.
Otherwise, returns an instance with only database access.
"""
# Create new instance if not exists
if "default" not in _gatewayInterfaces:
_gatewayInterfaces["default"] = GatewayInterface()
interface = _gatewayInterfaces["default"]
if currentUser:
interface.setUserContext(currentUser)
else:
logger.info("Returning interface without user context")
return interface