From 4c1d81160e9af07a5f18d3d8ae8bf4479c317d67 Mon Sep 17 00:00:00 2001
From: ValueOn AG
Date: Sun, 4 May 2025 00:12:28 +0200
Subject: [PATCH] prod azure 1.0.5
---
connectors/BACKUP-connectorDbJson.py | 569 +++++++++++++++++++++++++++
connectors/connectorDbJson.py | 12 +-
modules/BACKUP-gatewayInterface.py | 471 ++++++++++++++++++++++
3 files changed, 1050 insertions(+), 2 deletions(-)
create mode 100644 connectors/BACKUP-connectorDbJson.py
create mode 100644 modules/BACKUP-gatewayInterface.py
diff --git a/connectors/BACKUP-connectorDbJson.py b/connectors/BACKUP-connectorDbJson.py
new file mode 100644
index 00000000..f4bdea80
--- /dev/null
+++ b/connectors/BACKUP-connectorDbJson.py
@@ -0,0 +1,569 @@
+import json
+import os
+from typing import List, Dict, Any, Optional, Union
+import logging
+
+logger = logging.getLogger(__name__)
+
+class DatabaseConnector:
+ """
+ A connector for JSON-based data storage.
+ Provides generic database operations with tenant and user context support.
+ """
+ def __init__(self, dbHost: str, dbDatabase: str, dbUser: str = None, dbPassword: str = None,
+ mandateId: int = None, userId: int = None, skipInitialIdLookup: bool = False):
+ """
+ Initializes the JSON database connector.
+
+ Args:
+ dbHost: Directory for the JSON files
+ dbDatabase: Database name
+ dbUser: Username for authentication (optional)
+ dbPassword: API key for authentication (optional)
+ mandateId: Context parameter for the tenant
+ userId: Context parameter for the user
+ skipInitialIdLookup: When True, skips looking up initial IDs for mandateId and userId
+ """
+ # Store the input parameters
+ self.dbHost = dbHost
+ self.dbDatabase = dbDatabase
+ self.dbUser = dbUser
+ self.dbPassword = dbPassword
+ self.skipInitialIdLookup = skipInitialIdLookup
+
+ # Check if context parameters are set
+ if mandateId is None or userId is None:
+ raise ValueError("mandateId and userId must be set")
+
+ # Ensure the database directory exists
+ self.dbFolder = os.path.join(self.dbHost, self.dbDatabase)
+ os.makedirs(self.dbFolder, exist_ok=True)
+
+ # Cache for loaded data
+ self._tablesCache = {}
+
+ # Initialize system table
+ self._systemTableName = "_system"
+ self._initializeSystemTable()
+
+ # Temporarily store mandateId and userId
+ self._mandateId = mandateId
+ self._userId = userId
+
+ # If mandateId or userId are 0 and we're not skipping ID lookup, try to use the initial IDs
+ if not skipInitialIdLookup:
+ if mandateId == 0:
+ initialMandateId = self.getInitialId("mandates")
+ if initialMandateId is not None:
+ self._mandateId = initialMandateId
+ logger.info(f"Using initial mandateId: {initialMandateId} instead of 0")
+
+ if userId == 0:
+ initialUserId = self.getInitialId("users")
+ if initialUserId is not None:
+ self._userId = initialUserId
+ logger.info(f"Using initial userId: {initialUserId} instead of 0")
+
+ # Set the effective IDs as properties
+ self.mandateId = self._mandateId
+ self.userId = self._userId
+
+ logger.info(f"DatabaseConnector initialized for directory: {self.dbFolder}")
+ logger.debug(f"Context: mandateId={self.mandateId}, userId={self.userId}")
+
+ def _initializeSystemTable(self):
+ """Initializes the system table if it doesn't exist yet."""
+ systemTablePath = self._getTablePath(self._systemTableName)
+ if not os.path.exists(systemTablePath):
+ emptySystemTable = {}
+ self._saveSystemTable(emptySystemTable)
+ logger.info(f"System table initialized in {systemTablePath}")
+ else:
+ # Load existing system table to ensure it's available
+ self._loadSystemTable()
+ logger.debug(f"Existing system table loaded from {systemTablePath}")
+
+ def _loadSystemTable(self) -> Dict[str, int]:
+ """Loads the system table with the initial IDs."""
+ # Check if system table is in cache
+ if f"_{self._systemTableName}" in self._tablesCache:
+ return self._tablesCache[f"_{self._systemTableName}"]
+
+ systemTablePath = self._getTablePath(self._systemTableName)
+ try:
+ if os.path.exists(systemTablePath):
+ with open(systemTablePath, 'r', encoding='utf-8') as f:
+ data = json.load(f)
+ # Store in cache with special prefix to avoid collision with regular tables
+ self._tablesCache[f"_{self._systemTableName}"] = data
+ return data
+ else:
+ self._tablesCache[f"_{self._systemTableName}"] = {}
+ return {}
+ except Exception as e:
+ logger.error(f"Error loading the system table: {e}")
+ self._tablesCache[f"_{self._systemTableName}"] = {}
+ return {}
+
+ def _saveSystemTable(self, data: Dict[str, int]) -> bool:
+ """Saves the system table with the initial IDs."""
+ systemTablePath = self._getTablePath(self._systemTableName)
+ try:
+ with open(systemTablePath, 'w', encoding='utf-8') as f:
+ json.dump(data, f, indent=2, ensure_ascii=False)
+ # Update cache
+ self._tablesCache[f"_{self._systemTableName}"] = data
+ return True
+ except Exception as e:
+ logger.error(f"Error saving the system table: {e}")
+ return False
+
+ def _getTablePath(self, table: str) -> str:
+ """Returns the full path to a table file"""
+ return os.path.join(self.dbFolder, f"{table}.json")
+
+ def _loadTable(self, table: str) -> List[Dict[str, Any]]:
+ """Loads a table from the corresponding JSON file"""
+ path = self._getTablePath(table)
+
+ # If the table is the system table, load it directly
+ if table == self._systemTableName:
+ return [] # The system table is not treated like normal tables
+
+ # If the table is already in the cache, use the cache
+ if table in self._tablesCache:
+ return self._tablesCache[table]
+
+ # Otherwise load the file
+ try:
+ if os.path.exists(path):
+ with open(path, 'r', encoding='utf-8') as f:
+ data = json.load(f)
+ self._tablesCache[table] = data
+
+ # If data was loaded and no initial ID is registered yet,
+ # register the ID of the first record (if available)
+ if data and not self.hasInitialId(table):
+ if "id" in data[0]:
+ self._registerInitialId(table, data[0]["id"])
+ logger.info(f"Initial ID {data[0]['id']} for table {table} retroactively registered")
+
+ return data
+ else:
+ # If the file doesn't exist, create an empty table
+ logger.info(f"New table {table}")
+ self._tablesCache[table] = []
+ self._saveTable(table, [])
+ return []
+ except Exception as e:
+ logger.error(f"Error loading table {table}: {e}")
+ return []
+
+ def _saveTable(self, table: str, data: List[Dict[str, Any]]) -> bool:
+ """Saves a table to the corresponding JSON file"""
+ # The system table is handled specially
+ if table == self._systemTableName:
+ return False
+
+ path = self._getTablePath(table)
+ try:
+ with open(path, 'w', encoding='utf-8') as f:
+ json.dump(data, f, indent=2, ensure_ascii=False)
+
+ # Update the cache
+ self._tablesCache[table] = data
+ return True
+ except Exception as e:
+ logger.error(f"Error saving table {table}: {e}")
+ return False
+
+ def _filterByContext(self, records: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
+ """
+ Filters records by tenant and user context,
+ if these fields exist in the record.
+ """
+ filteredRecords = []
+
+ for record in records:
+ # Check if mandateId exists in the record and is not null
+ hasMandate = "mandateId" in record and record["mandateId"] is not None and record["mandateId"] != ""
+
+ # Check if userId exists in the record and is not null
+ hasUser = "userId" in record and record["userId"] is not None and record["userId"] != ""
+
+ # If both exist, filter accordingly
+ if hasMandate and hasUser:
+ if record["mandateId"] == self.mandateId:
+ filteredRecords.append(record)
+ # If only mandateId exists
+ elif hasMandate and not hasUser:
+ if record["mandateId"] == self.mandateId:
+ filteredRecords.append(record)
+ # If neither mandateId nor userId exist, add the record
+ elif not hasMandate and not hasUser:
+ filteredRecords.append(record)
+
+ return filteredRecords
+
+
+ def _applyRecordFilter(self, records: List[Dict[str, Any]], recordFilter: Dict[str, Any] = None) -> List[Dict[str, Any]]:
+ """Applies a record filter to the records"""
+ if not recordFilter:
+ return records
+
+ filteredRecords = []
+
+ for record in records:
+ match = True
+
+ for field, value in recordFilter.items():
+ # Check if the field exists
+ if field not in record:
+ match = False
+ break
+
+ # Handle type conversion for integer comparisons both ways
+ if isinstance(value, int) and isinstance(record[field], str) and record[field].isdigit():
+ # Filter value is int, record value is string
+ if value != int(record[field]):
+ match = False
+ break
+ elif isinstance(value, str) and value.isdigit() and isinstance(record[field], int):
+ # Filter value is string, record value is int
+ if record[field] != int(value):
+ match = False
+ break
+ # Otherwise direct comparison
+ elif record[field] != value:
+ match = False
+ break
+
+ if match:
+ filteredRecords.append(record)
+
+ return filteredRecords
+
+ def _registerInitialId(self, table: str, initialId: int) -> bool:
+ """
+ Registers the initial ID for a table.
+
+ Args:
+ table: Name of the table
+ initialId: The initial ID
+
+ Returns:
+ True on success, False on error
+ """
+ try:
+ # Load the current system table
+ systemData = self._loadSystemTable()
+
+ # Only register if not already present
+ if table not in systemData:
+ systemData[table] = initialId
+ success = self._saveSystemTable(systemData)
+ if success:
+ logger.info(f"Initial ID {initialId} for table {table} registered")
+ return success
+ return True # If already present, this is not an error
+ except Exception as e:
+ logger.error(f"Error registering the initial ID for table {table}: {e}")
+ return False
+
+ def _removeInitialId(self, table: str) -> bool:
+ """
+ Removes the initial ID for a table from the system table.
+
+ Args:
+ table: Name of the table
+
+ Returns:
+ True on success, False on error
+ """
+ try:
+ # Load the current system table
+ systemData = self._loadSystemTable()
+
+ # Remove the entry if it exists
+ if table in systemData:
+ del systemData[table]
+ success = self._saveSystemTable(systemData)
+ if success:
+ logger.info(f"Initial ID for table {table} removed from system table")
+ return success
+ return True # If not present, this is not an error
+ except Exception as e:
+ logger.error(f"Error removing initial ID for table {table}: {e}")
+ return False
+
+ # Public API
+
+ def getTables(self) -> List[str]:
+ """
+ Returns a list of all available tables.
+
+ Returns:
+ List of table names
+ """
+ tables = []
+
+ try:
+ for filename in os.listdir(self.dbFolder):
+ if filename.endswith('.json') and not filename.startswith('_'):
+ tableName = filename[:-5] # Remove the .json extension
+ tables.append(tableName)
+ except Exception as e:
+ logger.error(f"Error reading the database directory: {e}")
+
+ return tables
+
+ def getFields(self, table: str) -> List[str]:
+ """
+ Returns a list of all fields in a table.
+
+ Args:
+ table: Name of the table
+
+ Returns:
+ List of field names
+ """
+ # Load the table data
+ data = self._loadTable(table)
+
+ if not data:
+ return []
+
+ # Take the first record as a reference for the fields
+ fields = list(data[0].keys()) if data else []
+
+ return fields
+
+ def getSchema(self, table: str, language: str = None) -> Dict[str, Dict[str, Any]]:
+ """
+ Returns a schema object for a table with data types and labels.
+
+ Args:
+ table: Name of the table
+ language: Language for the labels (optional)
+
+ Returns:
+ Schema object with fields, data types and labels
+ """
+ # Load the table data
+ data = self._loadTable(table)
+
+ schema = {}
+
+ if not data:
+ return schema
+
+ # Take the first record as a reference for the fields and data types
+ firstRecord = data[0]
+
+ for field, value in firstRecord.items():
+ # Determine the data type
+ dataType = type(value).__name__
+
+ # Create label (default is the field name)
+ label = field
+
+ schema[field] = {
+ "type": dataType,
+ "label": label
+ }
+
+ return schema
+
+ def getRecordset(self, table: str, fieldFilter: List[str] = None, recordFilter: Dict[str, Any] = None) -> List[Dict[str, Any]]:
+ """
+ Returns a list of records from a table, filtered by criteria.
+
+ Args:
+ table: Name of the table
+ fieldFilter: Filter for fields (which fields should be returned)
+ recordFilter: Filter for records (which records should be returned)
+
+ Returns:
+ List of filtered records
+ """
+ # Load the table data
+ data = self._loadTable(table)
+ logger.debug(f"getRecordset: data volume of {len(data)} bytes")
+
+ # Filter by tenant and user context
+ filteredData = self._filterByContext(data)
+
+ # Apply recordFilter if available
+ if recordFilter:
+ filteredData = self._applyRecordFilter(filteredData, recordFilter)
+
+ # If fieldFilter is available, reduce the fields
+ if fieldFilter and isinstance(fieldFilter, list):
+ result = []
+ for record in filteredData:
+ filteredRecord = {}
+ for field in fieldFilter:
+ if field in record:
+ filteredRecord[field] = record[field]
+ result.append(filteredRecord)
+ return result
+
+ return filteredData
+
+ def recordCreate(self, table: str, recordData: Dict[str, Any]) -> Dict[str, Any]:
+ """
+ Creates a new record in the table.
+
+ Args:
+ table: Name of the table
+ recordData: Data for the new record
+
+ Returns:
+ The created record
+ """
+ # Load the table data
+ data = self._loadTable(table)
+
+ # Add mandateId and userId if not present or 0
+ if "mandateId" not in recordData or recordData["mandateId"] == 0:
+ recordData["mandateId"] = self.mandateId
+
+ if "userId" not in recordData or recordData["userId"] == 0:
+ recordData["userId"] = self.userId
+
+ # Determine the next ID if not present
+ if "id" not in recordData:
+ nextId = 1
+ if data:
+ nextId = max(record["id"] for record in data if "id" in record) + 1
+ recordData["id"] = nextId
+
+ # If the table is empty and a system ID should be registered
+ if not data:
+ self._registerInitialId(table, recordData["id"])
+ logger.info(f"Initial ID {recordData['id']} for table {table} has been registered")
+
+ # Add the new record
+ data.append(recordData)
+
+ # Save the updated table
+ if self._saveTable(table, data):
+ return recordData
+ else:
+ raise ValueError(f"Error creating the record in table {table}")
+
+ def recordDelete(self, table: str, recordId: Union[str, int]) -> bool:
+ """
+ Deletes a record from the table.
+
+ Args:
+ table: Name of the table
+ recordId: ID of the record to delete
+
+ Returns:
+ True on success, False on error
+ """
+ # Load table data
+ data = self._loadTable(table)
+
+ # Search for the record
+ for i, record in enumerate(data):
+ if "id" in record and record["id"] == recordId:
+ # Check if the record belongs to the current mandate
+ if "mandateId" in record and record["mandateId"] != self.mandateId:
+ raise ValueError("Not your mandate")
+
+ # Check if it's an initial record
+ initialId = self.getInitialId(table)
+ if initialId is not None and initialId == recordId:
+ # Remove this entry from the system table
+ self._removeInitialId(table)
+ logger.info(f"Initial ID {recordId} for table {table} has been removed from the system table")
+
+ # Delete the record
+ del data[i]
+
+ # Save the updated table
+ return self._saveTable(table, data)
+
+ # Record not found
+ return False
+
+ def recordModify(self, table: str, recordId: Union[str, int], recordData: Dict[str, Any]) -> Dict[str, Any]:
+ """
+ Modifies a record in the table.
+
+ Args:
+ table: Name of the table
+ recordId: ID of the record to modify
+ recordData: New data for the record
+
+ Returns:
+ The updated record
+ """
+ # Load table data
+ data = self._loadTable(table)
+
+ # Search for the record
+ for i, record in enumerate(data):
+ if "id" in record and record["id"] == recordId:
+ # Check if the record belongs to the current mandate
+ if "mandateId" in record and record["mandateId"] != self.mandateId:
+ raise ValueError("Not your mandate")
+
+ # Prevent changing the ID
+ if "id" in recordData and recordData["id"] != recordId:
+ raise ValueError(f"The ID of a record in table {table} cannot be changed")
+
+ # Update the record
+ for key, value in recordData.items():
+ data[i][key] = value
+
+ # Save the updated table
+ if self._saveTable(table, data):
+ return data[i]
+ else:
+ raise ValueError(f"Error updating record in table {table}")
+
+ # Record not found
+ raise ValueError(f"Record with ID {recordId} not found in table {table}")
+
+ def hasInitialId(self, table: str) -> bool:
+ """
+ Checks if an initial ID is registered for a table.
+
+ Args:
+ table: Name of the table
+
+ Returns:
+ True if an initial ID is registered, otherwise False
+ """
+ systemData = self._loadSystemTable()
+ return table in systemData
+
+ def getInitialId(self, table: str) -> Optional[int]:
+ """
+ Returns the initial ID for a table.
+
+ Args:
+ table: Name of the table
+
+ Returns:
+ The initial ID or None if not present
+ """
+ systemData = self._loadSystemTable()
+ initialId = systemData.get(table)
+ logger.debug(f"Database '{self.dbDatabase}': Initial ID for table '{table}' is {initialId}")
+ if initialId is None:
+ logger.debug(f"No initial ID found for table {table}")
+ return initialId
+
+ def getAllInitialIds(self) -> Dict[str, int]:
+ """
+ Returns all registered initial IDs.
+
+ Returns:
+ Dictionary with table names as keys and initial IDs as values
+ """
+ systemData = self._loadSystemTable()
+ return systemData.copy() # Return a copy to protect the original
\ No newline at end of file
diff --git a/connectors/connectorDbJson.py b/connectors/connectorDbJson.py
index 1a7a96cd..f4bdea80 100644
--- a/connectors/connectorDbJson.py
+++ b/connectors/connectorDbJson.py
@@ -205,6 +205,7 @@ class DatabaseConnector:
return filteredRecords
+
def _applyRecordFilter(self, records: List[Dict[str, Any]], recordFilter: Dict[str, Any] = None) -> List[Dict[str, Any]]:
"""Applies a record filter to the records"""
if not recordFilter:
@@ -221,8 +222,14 @@ class DatabaseConnector:
match = False
break
- # If the filter value is an integer string and the record field is an integer
- if isinstance(value, str) and value.isdigit() and isinstance(record[field], int):
+ # Handle type conversion for integer comparisons both ways
+ if isinstance(value, int) and isinstance(record[field], str) and record[field].isdigit():
+ # Filter value is int, record value is string
+ if value != int(record[field]):
+ match = False
+ break
+ elif isinstance(value, str) and value.isdigit() and isinstance(record[field], int):
+ # Filter value is string, record value is int
if record[field] != int(value):
match = False
break
@@ -381,6 +388,7 @@ class DatabaseConnector:
"""
# Load the table data
data = self._loadTable(table)
+ logger.debug(f"getRecordset: data volume of {len(data)} bytes")
# Filter by tenant and user context
filteredData = self._filterByContext(data)
diff --git a/modules/BACKUP-gatewayInterface.py b/modules/BACKUP-gatewayInterface.py
new file mode 100644
index 00000000..3e1120c7
--- /dev/null
+++ b/modules/BACKUP-gatewayInterface.py
@@ -0,0 +1,471 @@
+"""
+Interface to the Gateway system.
+Manages users and mandates for authentication.
+"""
+
+import os
+import logging
+from typing import Dict, Any, List, Optional, Union
+import importlib
+from passlib.context import CryptContext
+
+from connectors.connectorDbJson import DatabaseConnector
+from modules.configuration import APP_CONFIG
+
+logger = logging.getLogger(__name__)
+
+# Password-Hashing
+pwdContext = CryptContext(schemes=["argon2"], deprecated="auto")
+
+
+class GatewayInterface:
+ """
+ Interface to the Gateway system.
+ Manages users and mandates.
+ """
+
+ def __init__(self, mandateId: int = None, userId: int = None):
+ """
+ Initializes the Gateway Interface with optional mandate and user context.
+
+ Args:
+ mandateId: ID of the current mandate (optional)
+ userId: ID of the current user (optional)
+ """
+ # Context can be empty during initialization
+ self.mandateId = mandateId
+ self.userId = userId
+
+ # Import data model module
+ try:
+ self.modelModule = importlib.import_module("modules.gatewayModel")
+ logger.info("gatewayModel successfully imported")
+ except ImportError as e:
+ logger.error(f"Error importing gatewayModel: {e}")
+ raise
+
+ # Initialize database
+ self._initializeDatabase()
+
+ def _initializeDatabase(self):
+ """
+ Initializes the database with minimal objects
+ """
+
+ self.db = DatabaseConnector(
+ dbHost=APP_CONFIG.get("DB_SYSTEM_HOST"),
+ dbDatabase=APP_CONFIG.get("DB_SYSTEM_DATABASE"),
+ dbUser=APP_CONFIG.get("DB_SYSTEM_USER"),
+ dbPassword=APP_CONFIG.get("DB_SYSTEM_PASSWORD_SECRET"),
+ mandateId=self.mandateId if self.mandateId else 0,
+ userId=self.userId if self.userId else 0
+ )
+
+ # Create Root mandate if needed
+ existingMandateId = self.getInitialId("mandates")
+ mandates = self.db.getRecordset("mandates")
+ if existingMandateId is None or not mandates:
+ logger.info("Creating Root mandate")
+ rootMandate = {
+ "name": "Root",
+ "language": "de"
+ }
+ createdMandate = self.db.recordCreate("mandates", rootMandate)
+ logger.info(f"Root mandate created with ID {createdMandate['id']}")
+
+ # Update mandate context
+ self.mandateId = createdMandate['id']
+ self.userId = createdMandate['userId']
+
+ # Recreate connector with correct context
+ self.db = DatabaseConnector(
+ dbHost=APP_CONFIG.get("DB_SYSTEM_HOST"),
+ dbDatabase=APP_CONFIG.get("DB_SYSTEM_DATABASE"),
+ dbUser=APP_CONFIG.get("DB_SYSTEM_USER"),
+ dbPassword=APP_CONFIG.get("DB_SYSTEM_PASSWORD_SECRET"),
+ mandateId=self.mandateId,
+ userId=self.userId
+ )
+
+ # Create Admin user if needed
+ existingUserId = self.getInitialId("users")
+ users = self.db.getRecordset("users")
+ if existingUserId is None or not users:
+ logger.info("Creating Admin user")
+ adminUser = {
+ "mandateId": self.mandateId,
+ "username": "admin",
+ "email": "admin@example.com",
+ "fullName": "Administrator",
+ "disabled": False,
+ "language": "de",
+ "privilege": "sysadmin", # SysAdmin privilege
+ "hashedPassword": self._getPasswordHash("admin") # Use a secure password in production!
+ }
+ createdUser = self.db.recordCreate("users", adminUser)
+ logger.info(f"Admin user created with ID {createdUser['id']}")
+
+ # Update user context
+ self.userId = createdUser['id']
+
+ # Recreate connector with correct context
+ self.db = DatabaseConnector(
+ dbHost=APP_CONFIG.get("DB_SYSTEM_HOST"),
+ dbDatabase=APP_CONFIG.get("DB_SYSTEM_DATABASE"),
+ dbUser=APP_CONFIG.get("DB_SYSTEM_USER"),
+ dbPassword=APP_CONFIG.get("DB_SYSTEM_PASSWORD_SECRET"),
+ mandateId=self.mandateId,
+ userId=self.userId
+ )
+
+ def getInitialId(self, table: str) -> Optional[int]:
+ """Returns the initial ID for a table"""
+ return self.db.getInitialId(table)
+
+ def _getPasswordHash(self, password: str) -> str:
+ """Creates a hash for a password"""
+ return pwdContext.hash(password)
+
+ def _verifyPassword(self, plainPassword: str, hashedPassword: str) -> bool:
+ """Checks if the password matches the hash"""
+ return pwdContext.verify(plainPassword, hashedPassword)
+
+ def _getCurrentTimestamp(self) -> str:
+ """Returns the current timestamp in ISO format"""
+ from datetime import datetime
+ return datetime.now().isoformat()
+
+ # Mandate methods
+
+ def getAllMandates(self) -> List[Dict[str, Any]]:
+ """Returns all mandates"""
+ return self.db.getRecordset("mandates")
+
+ def getMandate(self, mandateId: int) -> Optional[Dict[str, Any]]:
+ """Returns a mandate by its ID"""
+ mandates = self.db.getRecordset("mandates", recordFilter={"id": mandateId})
+ if mandates:
+ return mandates[0]
+ return None
+
+ def createMandate(self, name: str, language: str = "de") -> Dict[str, Any]:
+ """Creates a new mandate"""
+ mandateData = {
+ "name": name,
+ "language": language
+ }
+
+ return self.db.recordCreate("mandates", mandateData)
+
+ def updateMandate(self, mandateId: int, mandateData: Dict[str, Any]) -> Dict[str, Any]:
+ """
+ Updates an existing mandate
+
+ Args:
+ mandateId: The ID of the mandate to update
+ mandateData: The mandate data to update
+
+ Returns:
+ Dict[str, Any]: The updated mandate data
+
+ Raises:
+ ValueError: If the mandate is not found
+ """
+ # Check if the mandate exists
+ mandate = self.getMandate(mandateId)
+ if not mandate:
+ raise ValueError(f"Mandate with ID {mandateId} not found")
+
+ # Update the mandate
+ updatedMandate = self.db.recordModify("mandates", mandateId, mandateData)
+
+ return updatedMandate
+
+ def deleteMandate(self, mandateId: int) -> bool:
+ """
+ Deletes a mandate and all associated users and data
+
+ Args:
+ mandateId: The ID of the mandate to delete
+
+ Returns:
+ bool: True if the mandate was successfully deleted, otherwise False
+ """
+ # Check if the mandate exists
+ mandate = self.getMandate(mandateId)
+ if not mandate:
+ return False
+
+ # Check if it's the initial mandate
+ initialMandateId = self.getInitialId("mandates")
+ if initialMandateId is not None and mandateId == initialMandateId:
+ logger.warning(f"Attempt to delete the Root mandate was prevented")
+ return False
+
+ # Find all users of the mandate
+ users = self.getUsersByMandate(mandateId)
+
+ # Delete all users of the mandate and their associated data
+ for user in users:
+ self.deleteUser(user["id"])
+
+ # Delete the mandate
+ success = self.db.recordDelete("mandates", mandateId)
+
+ if success:
+ logger.info(f"Mandate with ID {mandateId} was successfully deleted")
+ else:
+ logger.error(f"Error deleting mandate with ID {mandateId}")
+
+ return success
+
+ # User methods
+
+ def getAllUsers(self) -> List[Dict[str, Any]]:
+ """Returns all users"""
+ users = self.db.getRecordset("users")
+ # Remove password hashes from the response
+ for user in users:
+ if "hashedPassword" in user:
+ del user["hashedPassword"]
+ return users
+
+ def getUsersByMandate(self, mandateId: int) -> List[Dict[str, Any]]:
+ """
+ Returns all users of a specific mandate
+
+ Args:
+ mandateId: The ID of the mandate
+
+ Returns:
+ List[Dict[str, Any]]: List of users in the mandate
+ """
+ users = self.db.getRecordset("users", recordFilter={"mandateId": mandateId})
+ # Remove password hashes from the response
+ for user in users:
+ if "hashedPassword" in user:
+ del user["hashedPassword"]
+ return users
+
+ def getUserByUsername(self, username: str) -> Optional[Dict[str, Any]]:
+ """Returns a user by username"""
+ users = self.db.getRecordset("users")
+ for user in users:
+ if user.get("username") == username:
+ return user
+ return None
+
+ def getUser(self, userId: int) -> Optional[Dict[str, Any]]:
+ """Returns a user by ID"""
+ users = self.db.getRecordset("users", recordFilter={"id": userId})
+ if users:
+ user = users[0]
+ # Remove password hash from the API response
+ if "hashedPassword" in user:
+ userCopy = user.copy()
+ del userCopy["hashedPassword"]
+ return userCopy
+ return user
+ return None
+
+ def createUser(self, username: str, password: str, email: str = None,
+ fullName: str = None, language: str = "de", mandateId: int = None,
+ disabled: bool = False, privilege: str = "user") -> Dict[str, Any]:
+ """
+ Creates a new user
+
+ Args:
+ username: The username
+ password: The password
+ email: The email address (optional)
+ fullName: The full name (optional)
+ language: The preferred language (default: "de")
+ mandateId: The ID of the mandate (optional)
+ disabled: Whether the user is disabled (default: False)
+ privilege: The privilege level (default: "user")
+
+ Returns:
+ Dict[str, Any]: The created user data
+
+ Raises:
+ ValueError: If the username already exists
+ """
+ # Check if the username already exists
+ existingUser = self.getUserByUsername(username)
+ if existingUser:
+ raise ValueError(f"User '{username}' already exists")
+
+ # Use the provided mandateId or the current context
+ userMandateId = mandateId if mandateId is not None else self.mandateId
+
+ userData = {
+ "mandateId": userMandateId,
+ "username": username,
+ "email": email,
+ "fullName": fullName,
+ "disabled": disabled,
+ "language": language,
+ "privilege": privilege,
+ "hashedPassword": self._getPasswordHash(password)
+ }
+
+ createdUser = self.db.recordCreate("users", userData)
+
+ # Remove password hash from the response
+ if "hashedPassword" in createdUser:
+ del createdUser["hashedPassword"]
+
+ return createdUser
+
+ def authenticateUser(self, username: str, password: str) -> Optional[Dict[str, Any]]:
+ """
+ Authenticates a user by username and password
+
+ Args:
+ username: The username
+ password: The password
+
+ Returns:
+ Optional[Dict[str, Any]]: The user data or None if authentication fails
+ """
+ user = self.getUserByUsername(username)
+
+ if not user:
+ return None
+
+ if not self._verifyPassword(password, user.get("hashedPassword", "")):
+ return None
+
+ # Check if the user is disabled
+ if user.get("disabled", False):
+ return None
+
+ # Create a copy without password hash
+ authenticatedUser = {**user}
+ if "hashedPassword" in authenticatedUser:
+ del authenticatedUser["hashedPassword"]
+
+ return authenticatedUser
+
+ def updateUser(self, userId: int, userData: Dict[str, Any]) -> Dict[str, Any]:
+ """
+ Updates a user
+
+ Args:
+ userId: The ID of the user to update
+ userData: The user data to update
+
+ Returns:
+ Dict[str, Any]: The updated user data
+
+ Raises:
+ ValueError: If the user is not found
+ """
+ # Get the current user with password hash (directly from DB)
+ users = self.db.getRecordset("users", recordFilter={"id": userId})
+ if not users:
+ raise ValueError(f"User with ID {userId} not found")
+
+ user = users[0]
+
+ # If the password is being changed, hash it
+ if "password" in userData:
+ userData["hashedPassword"] = self._getPasswordHash(userData["password"])
+ del userData["password"]
+
+ # Update the user
+ updatedUser = self.db.recordModify("users", userId, userData)
+
+ # Remove password hash from the response
+ if "hashedPassword" in updatedUser:
+ del updatedUser["hashedPassword"]
+
+ return updatedUser
+
+ def disableUser(self, userId: int) -> Dict[str, Any]:
+ """Disables a user"""
+ return self.updateUser(userId, {"disabled": True})
+
+ def enableUser(self, userId: int) -> Dict[str, Any]:
+ """Enables a user"""
+ return self.updateUser(userId, {"disabled": False})
+
+ def _deleteUserReferencedData(self, userId: int) -> None:
+ """
+ Deletes all data associated with a user
+
+ Args:
+ userId: The ID of the user
+ """
+ # Here all tables are searched and all entries referencing this user are deleted
+
+ # Delete user attributes
+ try:
+ attributes = self.db.getRecordset("attributes", recordFilter={"userId": userId})
+ for attribute in attributes:
+ self.db.recordDelete("attributes", attribute["id"])
+ except Exception as e:
+ logger.error(f"Error deleting attributes for user {userId}: {e}")
+
+ # Other tables that might reference the user
+ # (Depending on the application's database structure)
+
+ logger.info(f"All referenced data for user {userId} has been deleted")
+
+ def deleteUser(self, userId: int) -> bool:
+ """
+ Deletes a user and all associated data
+
+ Args:
+ userId: The ID of the user to delete
+
+ Returns:
+ bool: True if the user was successfully deleted, otherwise False
+ """
+ # Check if the user exists
+ users = self.db.getRecordset("users", recordFilter={"id": userId})
+ if not users:
+ return False
+
+ # Check if it's the initial user
+ initialUserId = self.getInitialId("users")
+ if initialUserId is not None and userId == initialUserId:
+ logger.warning("Attempt to delete the Root Admin was prevented")
+ return False
+
+ # Delete all data associated with the user
+ self._deleteUserReferencedData(userId)
+
+ # Delete the user
+ success = self.db.recordDelete("users", userId)
+
+ if success:
+ logger.info(f"User with ID {userId} was successfully deleted")
+ else:
+ logger.error(f"Error deleting user with ID {userId}")
+
+ return success
+
+
+# Singleton factory for GatewayInterface instances per context
+_gatewayInterfaces = {}
+
+def getGatewayInterface(mandateId: int = None, userId: int = None) -> GatewayInterface:
+ """
+ Returns a GatewayInterface instance for the specified context.
+ Reuses existing instances.
+
+ Args:
+ mandateId: ID of the mandate
+ userId: ID of the user
+
+ Returns:
+ GatewayInterface instance
+ """
+ contextKey = f"{mandateId}_{userId}"
+ if contextKey not in _gatewayInterfaces:
+ _gatewayInterfaces[contextKey] = GatewayInterface(mandateId, userId)
+ return _gatewayInterfaces[contextKey]
+
+# Initialize the interface
+getGatewayInterface()
\ No newline at end of file