prod azure 1.0.5

This commit is contained in:
ValueOn AG 2025-05-04 00:12:28 +02:00
parent da2983c5a2
commit 4c1d81160e
3 changed files with 1050 additions and 2 deletions

View file

@ -0,0 +1,569 @@
import json
import os
from typing import List, Dict, Any, Optional, Union
import logging
logger = logging.getLogger(__name__)
class DatabaseConnector:
"""
A connector for JSON-based data storage.
Provides generic database operations with tenant and user context support.
"""
def __init__(self, dbHost: str, dbDatabase: str, dbUser: str = None, dbPassword: str = None,
mandateId: int = None, userId: int = None, skipInitialIdLookup: bool = False):
"""
Initializes the JSON database connector.
Args:
dbHost: Directory for the JSON files
dbDatabase: Database name
dbUser: Username for authentication (optional)
dbPassword: API key for authentication (optional)
mandateId: Context parameter for the tenant
userId: Context parameter for the user
skipInitialIdLookup: When True, skips looking up initial IDs for mandateId and userId
"""
# Store the input parameters
self.dbHost = dbHost
self.dbDatabase = dbDatabase
self.dbUser = dbUser
self.dbPassword = dbPassword
self.skipInitialIdLookup = skipInitialIdLookup
# Check if context parameters are set
if mandateId is None or userId is None:
raise ValueError("mandateId and userId must be set")
# Ensure the database directory exists
self.dbFolder = os.path.join(self.dbHost, self.dbDatabase)
os.makedirs(self.dbFolder, exist_ok=True)
# Cache for loaded data
self._tablesCache = {}
# Initialize system table
self._systemTableName = "_system"
self._initializeSystemTable()
# Temporarily store mandateId and userId
self._mandateId = mandateId
self._userId = userId
# If mandateId or userId are 0 and we're not skipping ID lookup, try to use the initial IDs
if not skipInitialIdLookup:
if mandateId == 0:
initialMandateId = self.getInitialId("mandates")
if initialMandateId is not None:
self._mandateId = initialMandateId
logger.info(f"Using initial mandateId: {initialMandateId} instead of 0")
if userId == 0:
initialUserId = self.getInitialId("users")
if initialUserId is not None:
self._userId = initialUserId
logger.info(f"Using initial userId: {initialUserId} instead of 0")
# Set the effective IDs as properties
self.mandateId = self._mandateId
self.userId = self._userId
logger.info(f"DatabaseConnector initialized for directory: {self.dbFolder}")
logger.debug(f"Context: mandateId={self.mandateId}, userId={self.userId}")
def _initializeSystemTable(self):
"""Initializes the system table if it doesn't exist yet."""
systemTablePath = self._getTablePath(self._systemTableName)
if not os.path.exists(systemTablePath):
emptySystemTable = {}
self._saveSystemTable(emptySystemTable)
logger.info(f"System table initialized in {systemTablePath}")
else:
# Load existing system table to ensure it's available
self._loadSystemTable()
logger.debug(f"Existing system table loaded from {systemTablePath}")
def _loadSystemTable(self) -> Dict[str, int]:
"""Loads the system table with the initial IDs."""
# Check if system table is in cache
if f"_{self._systemTableName}" in self._tablesCache:
return self._tablesCache[f"_{self._systemTableName}"]
systemTablePath = self._getTablePath(self._systemTableName)
try:
if os.path.exists(systemTablePath):
with open(systemTablePath, 'r', encoding='utf-8') as f:
data = json.load(f)
# Store in cache with special prefix to avoid collision with regular tables
self._tablesCache[f"_{self._systemTableName}"] = data
return data
else:
self._tablesCache[f"_{self._systemTableName}"] = {}
return {}
except Exception as e:
logger.error(f"Error loading the system table: {e}")
self._tablesCache[f"_{self._systemTableName}"] = {}
return {}
def _saveSystemTable(self, data: Dict[str, int]) -> bool:
"""Saves the system table with the initial IDs."""
systemTablePath = self._getTablePath(self._systemTableName)
try:
with open(systemTablePath, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
# Update cache
self._tablesCache[f"_{self._systemTableName}"] = data
return True
except Exception as e:
logger.error(f"Error saving the system table: {e}")
return False
def _getTablePath(self, table: str) -> str:
"""Returns the full path to a table file"""
return os.path.join(self.dbFolder, f"{table}.json")
def _loadTable(self, table: str) -> List[Dict[str, Any]]:
"""Loads a table from the corresponding JSON file"""
path = self._getTablePath(table)
# If the table is the system table, load it directly
if table == self._systemTableName:
return [] # The system table is not treated like normal tables
# If the table is already in the cache, use the cache
if table in self._tablesCache:
return self._tablesCache[table]
# Otherwise load the file
try:
if os.path.exists(path):
with open(path, 'r', encoding='utf-8') as f:
data = json.load(f)
self._tablesCache[table] = data
# If data was loaded and no initial ID is registered yet,
# register the ID of the first record (if available)
if data and not self.hasInitialId(table):
if "id" in data[0]:
self._registerInitialId(table, data[0]["id"])
logger.info(f"Initial ID {data[0]['id']} for table {table} retroactively registered")
return data
else:
# If the file doesn't exist, create an empty table
logger.info(f"New table {table}")
self._tablesCache[table] = []
self._saveTable(table, [])
return []
except Exception as e:
logger.error(f"Error loading table {table}: {e}")
return []
def _saveTable(self, table: str, data: List[Dict[str, Any]]) -> bool:
"""Saves a table to the corresponding JSON file"""
# The system table is handled specially
if table == self._systemTableName:
return False
path = self._getTablePath(table)
try:
with open(path, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
# Update the cache
self._tablesCache[table] = data
return True
except Exception as e:
logger.error(f"Error saving table {table}: {e}")
return False
def _filterByContext(self, records: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Filters records by tenant and user context,
if these fields exist in the record.
"""
filteredRecords = []
for record in records:
# Check if mandateId exists in the record and is not null
hasMandate = "mandateId" in record and record["mandateId"] is not None and record["mandateId"] != ""
# Check if userId exists in the record and is not null
hasUser = "userId" in record and record["userId"] is not None and record["userId"] != ""
# If both exist, filter accordingly
if hasMandate and hasUser:
if record["mandateId"] == self.mandateId:
filteredRecords.append(record)
# If only mandateId exists
elif hasMandate and not hasUser:
if record["mandateId"] == self.mandateId:
filteredRecords.append(record)
# If neither mandateId nor userId exist, add the record
elif not hasMandate and not hasUser:
filteredRecords.append(record)
return filteredRecords
def _applyRecordFilter(self, records: List[Dict[str, Any]], recordFilter: Dict[str, Any] = None) -> List[Dict[str, Any]]:
"""Applies a record filter to the records"""
if not recordFilter:
return records
filteredRecords = []
for record in records:
match = True
for field, value in recordFilter.items():
# Check if the field exists
if field not in record:
match = False
break
# Handle type conversion for integer comparisons both ways
if isinstance(value, int) and isinstance(record[field], str) and record[field].isdigit():
# Filter value is int, record value is string
if value != int(record[field]):
match = False
break
elif isinstance(value, str) and value.isdigit() and isinstance(record[field], int):
# Filter value is string, record value is int
if record[field] != int(value):
match = False
break
# Otherwise direct comparison
elif record[field] != value:
match = False
break
if match:
filteredRecords.append(record)
return filteredRecords
def _registerInitialId(self, table: str, initialId: int) -> bool:
"""
Registers the initial ID for a table.
Args:
table: Name of the table
initialId: The initial ID
Returns:
True on success, False on error
"""
try:
# Load the current system table
systemData = self._loadSystemTable()
# Only register if not already present
if table not in systemData:
systemData[table] = initialId
success = self._saveSystemTable(systemData)
if success:
logger.info(f"Initial ID {initialId} for table {table} registered")
return success
return True # If already present, this is not an error
except Exception as e:
logger.error(f"Error registering the initial ID for table {table}: {e}")
return False
def _removeInitialId(self, table: str) -> bool:
"""
Removes the initial ID for a table from the system table.
Args:
table: Name of the table
Returns:
True on success, False on error
"""
try:
# Load the current system table
systemData = self._loadSystemTable()
# Remove the entry if it exists
if table in systemData:
del systemData[table]
success = self._saveSystemTable(systemData)
if success:
logger.info(f"Initial ID for table {table} removed from system table")
return success
return True # If not present, this is not an error
except Exception as e:
logger.error(f"Error removing initial ID for table {table}: {e}")
return False
# Public API
def getTables(self) -> List[str]:
"""
Returns a list of all available tables.
Returns:
List of table names
"""
tables = []
try:
for filename in os.listdir(self.dbFolder):
if filename.endswith('.json') and not filename.startswith('_'):
tableName = filename[:-5] # Remove the .json extension
tables.append(tableName)
except Exception as e:
logger.error(f"Error reading the database directory: {e}")
return tables
def getFields(self, table: str) -> List[str]:
"""
Returns a list of all fields in a table.
Args:
table: Name of the table
Returns:
List of field names
"""
# Load the table data
data = self._loadTable(table)
if not data:
return []
# Take the first record as a reference for the fields
fields = list(data[0].keys()) if data else []
return fields
def getSchema(self, table: str, language: str = None) -> Dict[str, Dict[str, Any]]:
"""
Returns a schema object for a table with data types and labels.
Args:
table: Name of the table
language: Language for the labels (optional)
Returns:
Schema object with fields, data types and labels
"""
# Load the table data
data = self._loadTable(table)
schema = {}
if not data:
return schema
# Take the first record as a reference for the fields and data types
firstRecord = data[0]
for field, value in firstRecord.items():
# Determine the data type
dataType = type(value).__name__
# Create label (default is the field name)
label = field
schema[field] = {
"type": dataType,
"label": label
}
return schema
def getRecordset(self, table: str, fieldFilter: List[str] = None, recordFilter: Dict[str, Any] = None) -> List[Dict[str, Any]]:
"""
Returns a list of records from a table, filtered by criteria.
Args:
table: Name of the table
fieldFilter: Filter for fields (which fields should be returned)
recordFilter: Filter for records (which records should be returned)
Returns:
List of filtered records
"""
# Load the table data
data = self._loadTable(table)
logger.debug(f"getRecordset: data volume of {len(data)} bytes")
# Filter by tenant and user context
filteredData = self._filterByContext(data)
# Apply recordFilter if available
if recordFilter:
filteredData = self._applyRecordFilter(filteredData, recordFilter)
# If fieldFilter is available, reduce the fields
if fieldFilter and isinstance(fieldFilter, list):
result = []
for record in filteredData:
filteredRecord = {}
for field in fieldFilter:
if field in record:
filteredRecord[field] = record[field]
result.append(filteredRecord)
return result
return filteredData
def recordCreate(self, table: str, recordData: Dict[str, Any]) -> Dict[str, Any]:
"""
Creates a new record in the table.
Args:
table: Name of the table
recordData: Data for the new record
Returns:
The created record
"""
# Load the table data
data = self._loadTable(table)
# Add mandateId and userId if not present or 0
if "mandateId" not in recordData or recordData["mandateId"] == 0:
recordData["mandateId"] = self.mandateId
if "userId" not in recordData or recordData["userId"] == 0:
recordData["userId"] = self.userId
# Determine the next ID if not present
if "id" not in recordData:
nextId = 1
if data:
nextId = max(record["id"] for record in data if "id" in record) + 1
recordData["id"] = nextId
# If the table is empty and a system ID should be registered
if not data:
self._registerInitialId(table, recordData["id"])
logger.info(f"Initial ID {recordData['id']} for table {table} has been registered")
# Add the new record
data.append(recordData)
# Save the updated table
if self._saveTable(table, data):
return recordData
else:
raise ValueError(f"Error creating the record in table {table}")
def recordDelete(self, table: str, recordId: Union[str, int]) -> bool:
"""
Deletes a record from the table.
Args:
table: Name of the table
recordId: ID of the record to delete
Returns:
True on success, False on error
"""
# Load table data
data = self._loadTable(table)
# Search for the record
for i, record in enumerate(data):
if "id" in record and record["id"] == recordId:
# Check if the record belongs to the current mandate
if "mandateId" in record and record["mandateId"] != self.mandateId:
raise ValueError("Not your mandate")
# Check if it's an initial record
initialId = self.getInitialId(table)
if initialId is not None and initialId == recordId:
# Remove this entry from the system table
self._removeInitialId(table)
logger.info(f"Initial ID {recordId} for table {table} has been removed from the system table")
# Delete the record
del data[i]
# Save the updated table
return self._saveTable(table, data)
# Record not found
return False
def recordModify(self, table: str, recordId: Union[str, int], recordData: Dict[str, Any]) -> Dict[str, Any]:
"""
Modifies a record in the table.
Args:
table: Name of the table
recordId: ID of the record to modify
recordData: New data for the record
Returns:
The updated record
"""
# Load table data
data = self._loadTable(table)
# Search for the record
for i, record in enumerate(data):
if "id" in record and record["id"] == recordId:
# Check if the record belongs to the current mandate
if "mandateId" in record and record["mandateId"] != self.mandateId:
raise ValueError("Not your mandate")
# Prevent changing the ID
if "id" in recordData and recordData["id"] != recordId:
raise ValueError(f"The ID of a record in table {table} cannot be changed")
# Update the record
for key, value in recordData.items():
data[i][key] = value
# Save the updated table
if self._saveTable(table, data):
return data[i]
else:
raise ValueError(f"Error updating record in table {table}")
# Record not found
raise ValueError(f"Record with ID {recordId} not found in table {table}")
def hasInitialId(self, table: str) -> bool:
"""
Checks if an initial ID is registered for a table.
Args:
table: Name of the table
Returns:
True if an initial ID is registered, otherwise False
"""
systemData = self._loadSystemTable()
return table in systemData
def getInitialId(self, table: str) -> Optional[int]:
"""
Returns the initial ID for a table.
Args:
table: Name of the table
Returns:
The initial ID or None if not present
"""
systemData = self._loadSystemTable()
initialId = systemData.get(table)
logger.debug(f"Database '{self.dbDatabase}': Initial ID for table '{table}' is {initialId}")
if initialId is None:
logger.debug(f"No initial ID found for table {table}")
return initialId
def getAllInitialIds(self) -> Dict[str, int]:
"""
Returns all registered initial IDs.
Returns:
Dictionary with table names as keys and initial IDs as values
"""
systemData = self._loadSystemTable()
return systemData.copy() # Return a copy to protect the original

View file

@ -205,6 +205,7 @@ class DatabaseConnector:
return filteredRecords
def _applyRecordFilter(self, records: List[Dict[str, Any]], recordFilter: Dict[str, Any] = None) -> List[Dict[str, Any]]:
"""Applies a record filter to the records"""
if not recordFilter:
@ -221,8 +222,14 @@ class DatabaseConnector:
match = False
break
# If the filter value is an integer string and the record field is an integer
if isinstance(value, str) and value.isdigit() and isinstance(record[field], int):
# Handle type conversion for integer comparisons both ways
if isinstance(value, int) and isinstance(record[field], str) and record[field].isdigit():
# Filter value is int, record value is string
if value != int(record[field]):
match = False
break
elif isinstance(value, str) and value.isdigit() and isinstance(record[field], int):
# Filter value is string, record value is int
if record[field] != int(value):
match = False
break
@ -381,6 +388,7 @@ class DatabaseConnector:
"""
# Load the table data
data = self._loadTable(table)
logger.debug(f"getRecordset: data volume of {len(data)} bytes")
# Filter by tenant and user context
filteredData = self._filterByContext(data)

View file

@ -0,0 +1,471 @@
"""
Interface to the Gateway system.
Manages users and mandates for authentication.
"""
import os
import logging
from typing import Dict, Any, List, Optional, Union
import importlib
from passlib.context import CryptContext
from connectors.connectorDbJson import DatabaseConnector
from modules.configuration import APP_CONFIG
logger = logging.getLogger(__name__)
# Password-Hashing
pwdContext = CryptContext(schemes=["argon2"], deprecated="auto")
class GatewayInterface:
"""
Interface to the Gateway system.
Manages users and mandates.
"""
def __init__(self, mandateId: int = None, userId: int = None):
"""
Initializes the Gateway Interface with optional mandate and user context.
Args:
mandateId: ID of the current mandate (optional)
userId: ID of the current user (optional)
"""
# Context can be empty during initialization
self.mandateId = mandateId
self.userId = userId
# Import data model module
try:
self.modelModule = importlib.import_module("modules.gatewayModel")
logger.info("gatewayModel successfully imported")
except ImportError as e:
logger.error(f"Error importing gatewayModel: {e}")
raise
# Initialize database
self._initializeDatabase()
def _initializeDatabase(self):
"""
Initializes the database with minimal objects
"""
self.db = DatabaseConnector(
dbHost=APP_CONFIG.get("DB_SYSTEM_HOST"),
dbDatabase=APP_CONFIG.get("DB_SYSTEM_DATABASE"),
dbUser=APP_CONFIG.get("DB_SYSTEM_USER"),
dbPassword=APP_CONFIG.get("DB_SYSTEM_PASSWORD_SECRET"),
mandateId=self.mandateId if self.mandateId else 0,
userId=self.userId if self.userId else 0
)
# Create Root mandate if needed
existingMandateId = self.getInitialId("mandates")
mandates = self.db.getRecordset("mandates")
if existingMandateId is None or not mandates:
logger.info("Creating Root mandate")
rootMandate = {
"name": "Root",
"language": "de"
}
createdMandate = self.db.recordCreate("mandates", rootMandate)
logger.info(f"Root mandate created with ID {createdMandate['id']}")
# Update mandate context
self.mandateId = createdMandate['id']
self.userId = createdMandate['userId']
# Recreate connector with correct context
self.db = DatabaseConnector(
dbHost=APP_CONFIG.get("DB_SYSTEM_HOST"),
dbDatabase=APP_CONFIG.get("DB_SYSTEM_DATABASE"),
dbUser=APP_CONFIG.get("DB_SYSTEM_USER"),
dbPassword=APP_CONFIG.get("DB_SYSTEM_PASSWORD_SECRET"),
mandateId=self.mandateId,
userId=self.userId
)
# Create Admin user if needed
existingUserId = self.getInitialId("users")
users = self.db.getRecordset("users")
if existingUserId is None or not users:
logger.info("Creating Admin user")
adminUser = {
"mandateId": self.mandateId,
"username": "admin",
"email": "admin@example.com",
"fullName": "Administrator",
"disabled": False,
"language": "de",
"privilege": "sysadmin", # SysAdmin privilege
"hashedPassword": self._getPasswordHash("admin") # Use a secure password in production!
}
createdUser = self.db.recordCreate("users", adminUser)
logger.info(f"Admin user created with ID {createdUser['id']}")
# Update user context
self.userId = createdUser['id']
# Recreate connector with correct context
self.db = DatabaseConnector(
dbHost=APP_CONFIG.get("DB_SYSTEM_HOST"),
dbDatabase=APP_CONFIG.get("DB_SYSTEM_DATABASE"),
dbUser=APP_CONFIG.get("DB_SYSTEM_USER"),
dbPassword=APP_CONFIG.get("DB_SYSTEM_PASSWORD_SECRET"),
mandateId=self.mandateId,
userId=self.userId
)
def getInitialId(self, table: str) -> Optional[int]:
"""Returns the initial ID for a table"""
return self.db.getInitialId(table)
def _getPasswordHash(self, password: str) -> str:
"""Creates a hash for a password"""
return pwdContext.hash(password)
def _verifyPassword(self, plainPassword: str, hashedPassword: str) -> bool:
"""Checks if the password matches the hash"""
return pwdContext.verify(plainPassword, hashedPassword)
def _getCurrentTimestamp(self) -> str:
"""Returns the current timestamp in ISO format"""
from datetime import datetime
return datetime.now().isoformat()
# Mandate methods
def getAllMandates(self) -> List[Dict[str, Any]]:
"""Returns all mandates"""
return self.db.getRecordset("mandates")
def getMandate(self, mandateId: int) -> Optional[Dict[str, Any]]:
"""Returns a mandate by its ID"""
mandates = self.db.getRecordset("mandates", recordFilter={"id": mandateId})
if mandates:
return mandates[0]
return None
def createMandate(self, name: str, language: str = "de") -> Dict[str, Any]:
"""Creates a new mandate"""
mandateData = {
"name": name,
"language": language
}
return self.db.recordCreate("mandates", mandateData)
def updateMandate(self, mandateId: int, mandateData: Dict[str, Any]) -> Dict[str, Any]:
"""
Updates an existing mandate
Args:
mandateId: The ID of the mandate to update
mandateData: The mandate data to update
Returns:
Dict[str, Any]: The updated mandate data
Raises:
ValueError: If the mandate is not found
"""
# Check if the mandate exists
mandate = self.getMandate(mandateId)
if not mandate:
raise ValueError(f"Mandate with ID {mandateId} not found")
# Update the mandate
updatedMandate = self.db.recordModify("mandates", mandateId, mandateData)
return updatedMandate
def deleteMandate(self, mandateId: int) -> bool:
"""
Deletes a mandate and all associated users and data
Args:
mandateId: The ID of the mandate to delete
Returns:
bool: True if the mandate was successfully deleted, otherwise False
"""
# Check if the mandate exists
mandate = self.getMandate(mandateId)
if not mandate:
return False
# Check if it's the initial mandate
initialMandateId = self.getInitialId("mandates")
if initialMandateId is not None and mandateId == initialMandateId:
logger.warning(f"Attempt to delete the Root mandate was prevented")
return False
# Find all users of the mandate
users = self.getUsersByMandate(mandateId)
# Delete all users of the mandate and their associated data
for user in users:
self.deleteUser(user["id"])
# Delete the mandate
success = self.db.recordDelete("mandates", mandateId)
if success:
logger.info(f"Mandate with ID {mandateId} was successfully deleted")
else:
logger.error(f"Error deleting mandate with ID {mandateId}")
return success
# User methods
def getAllUsers(self) -> List[Dict[str, Any]]:
"""Returns all users"""
users = self.db.getRecordset("users")
# Remove password hashes from the response
for user in users:
if "hashedPassword" in user:
del user["hashedPassword"]
return users
def getUsersByMandate(self, mandateId: int) -> List[Dict[str, Any]]:
"""
Returns all users of a specific mandate
Args:
mandateId: The ID of the mandate
Returns:
List[Dict[str, Any]]: List of users in the mandate
"""
users = self.db.getRecordset("users", recordFilter={"mandateId": mandateId})
# Remove password hashes from the response
for user in users:
if "hashedPassword" in user:
del user["hashedPassword"]
return users
def getUserByUsername(self, username: str) -> Optional[Dict[str, Any]]:
"""Returns a user by username"""
users = self.db.getRecordset("users")
for user in users:
if user.get("username") == username:
return user
return None
def getUser(self, userId: int) -> Optional[Dict[str, Any]]:
"""Returns a user by ID"""
users = self.db.getRecordset("users", recordFilter={"id": userId})
if users:
user = users[0]
# Remove password hash from the API response
if "hashedPassword" in user:
userCopy = user.copy()
del userCopy["hashedPassword"]
return userCopy
return user
return None
def createUser(self, username: str, password: str, email: str = None,
fullName: str = None, language: str = "de", mandateId: int = None,
disabled: bool = False, privilege: str = "user") -> Dict[str, Any]:
"""
Creates a new user
Args:
username: The username
password: The password
email: The email address (optional)
fullName: The full name (optional)
language: The preferred language (default: "de")
mandateId: The ID of the mandate (optional)
disabled: Whether the user is disabled (default: False)
privilege: The privilege level (default: "user")
Returns:
Dict[str, Any]: The created user data
Raises:
ValueError: If the username already exists
"""
# Check if the username already exists
existingUser = self.getUserByUsername(username)
if existingUser:
raise ValueError(f"User '{username}' already exists")
# Use the provided mandateId or the current context
userMandateId = mandateId if mandateId is not None else self.mandateId
userData = {
"mandateId": userMandateId,
"username": username,
"email": email,
"fullName": fullName,
"disabled": disabled,
"language": language,
"privilege": privilege,
"hashedPassword": self._getPasswordHash(password)
}
createdUser = self.db.recordCreate("users", userData)
# Remove password hash from the response
if "hashedPassword" in createdUser:
del createdUser["hashedPassword"]
return createdUser
def authenticateUser(self, username: str, password: str) -> Optional[Dict[str, Any]]:
"""
Authenticates a user by username and password
Args:
username: The username
password: The password
Returns:
Optional[Dict[str, Any]]: The user data or None if authentication fails
"""
user = self.getUserByUsername(username)
if not user:
return None
if not self._verifyPassword(password, user.get("hashedPassword", "")):
return None
# Check if the user is disabled
if user.get("disabled", False):
return None
# Create a copy without password hash
authenticatedUser = {**user}
if "hashedPassword" in authenticatedUser:
del authenticatedUser["hashedPassword"]
return authenticatedUser
def updateUser(self, userId: int, userData: Dict[str, Any]) -> Dict[str, Any]:
"""
Updates a user
Args:
userId: The ID of the user to update
userData: The user data to update
Returns:
Dict[str, Any]: The updated user data
Raises:
ValueError: If the user is not found
"""
# Get the current user with password hash (directly from DB)
users = self.db.getRecordset("users", recordFilter={"id": userId})
if not users:
raise ValueError(f"User with ID {userId} not found")
user = users[0]
# If the password is being changed, hash it
if "password" in userData:
userData["hashedPassword"] = self._getPasswordHash(userData["password"])
del userData["password"]
# Update the user
updatedUser = self.db.recordModify("users", userId, userData)
# Remove password hash from the response
if "hashedPassword" in updatedUser:
del updatedUser["hashedPassword"]
return updatedUser
def disableUser(self, userId: int) -> Dict[str, Any]:
"""Disables a user"""
return self.updateUser(userId, {"disabled": True})
def enableUser(self, userId: int) -> Dict[str, Any]:
"""Enables a user"""
return self.updateUser(userId, {"disabled": False})
def _deleteUserReferencedData(self, userId: int) -> None:
"""
Deletes all data associated with a user
Args:
userId: The ID of the user
"""
# Here all tables are searched and all entries referencing this user are deleted
# Delete user attributes
try:
attributes = self.db.getRecordset("attributes", recordFilter={"userId": userId})
for attribute in attributes:
self.db.recordDelete("attributes", attribute["id"])
except Exception as e:
logger.error(f"Error deleting attributes for user {userId}: {e}")
# Other tables that might reference the user
# (Depending on the application's database structure)
logger.info(f"All referenced data for user {userId} has been deleted")
def deleteUser(self, userId: int) -> bool:
"""
Deletes a user and all associated data
Args:
userId: The ID of the user to delete
Returns:
bool: True if the user was successfully deleted, otherwise False
"""
# Check if the user exists
users = self.db.getRecordset("users", recordFilter={"id": userId})
if not users:
return False
# Check if it's the initial user
initialUserId = self.getInitialId("users")
if initialUserId is not None and userId == initialUserId:
logger.warning("Attempt to delete the Root Admin was prevented")
return False
# Delete all data associated with the user
self._deleteUserReferencedData(userId)
# Delete the user
success = self.db.recordDelete("users", userId)
if success:
logger.info(f"User with ID {userId} was successfully deleted")
else:
logger.error(f"Error deleting user with ID {userId}")
return success
# Singleton factory for GatewayInterface instances per context
_gatewayInterfaces = {}
def getGatewayInterface(mandateId: int = None, userId: int = None) -> GatewayInterface:
"""
Returns a GatewayInterface instance for the specified context.
Reuses existing instances.
Args:
mandateId: ID of the mandate
userId: ID of the user
Returns:
GatewayInterface instance
"""
contextKey = f"{mandateId}_{userId}"
if contextKey not in _gatewayInterfaces:
_gatewayInterfaces[contextKey] = GatewayInterface(mandateId, userId)
return _gatewayInterfaces[contextKey]
# Initialize the interface
getGatewayInterface()