prod azure 1.0.6

This commit is contained in:
ValueOn AG 2025-05-04 00:55:18 +02:00
parent 4c1d81160e
commit ff85bc0398
8 changed files with 2336 additions and 752 deletions

View file

@ -8,22 +8,10 @@ logger = logging.getLogger(__name__)
class DatabaseConnector:
"""
A connector for JSON-based data storage.
Provides generic database operations with tenant and user context support.
Provides generic database operations without user/mandate filtering.
"""
def __init__(self, dbHost: str, dbDatabase: str, dbUser: str = None, dbPassword: str = None,
mandateId: int = None, userId: int = None, skipInitialIdLookup: bool = False):
"""
Initializes the JSON database connector.
Args:
dbHost: Directory for the JSON files
dbDatabase: Database name
dbUser: Username for authentication (optional)
dbPassword: API key for authentication (optional)
mandateId: Context parameter for the tenant
userId: Context parameter for the user
skipInitialIdLookup: When True, skips looking up initial IDs for mandateId and userId
"""
# Store the input parameters
self.dbHost = dbHost
self.dbDatabase = dbDatabase
@ -177,35 +165,6 @@ class DatabaseConnector:
logger.error(f"Error saving table {table}: {e}")
return False
def _filterByContext(self, records: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Filters records by tenant and user context,
if these fields exist in the record.
"""
filteredRecords = []
for record in records:
# Check if mandateId exists in the record and is not null
hasMandate = "mandateId" in record and record["mandateId"] is not None and record["mandateId"] != ""
# Check if userId exists in the record and is not null
hasUser = "userId" in record and record["userId"] is not None and record["userId"] != ""
# If both exist, filter accordingly
if hasMandate and hasUser:
if record["mandateId"] == self.mandateId:
filteredRecords.append(record)
# If only mandateId exists
elif hasMandate and not hasUser:
if record["mandateId"] == self.mandateId:
filteredRecords.append(record)
# If neither mandateId nor userId exist, add the record
elif not hasMandate and not hasUser:
filteredRecords.append(record)
return filteredRecords
def _applyRecordFilter(self, records: List[Dict[str, Any]], recordFilter: Dict[str, Any] = None) -> List[Dict[str, Any]]:
"""Applies a record filter to the records"""
if not recordFilter:
@ -244,21 +203,10 @@ class DatabaseConnector:
return filteredRecords
def _registerInitialId(self, table: str, initialId: int) -> bool:
"""
Registers the initial ID for a table.
Args:
table: Name of the table
initialId: The initial ID
Returns:
True on success, False on error
"""
"""Registers the initial ID for a table."""
try:
# Load the current system table
systemData = self._loadSystemTable()
# Only register if not already present
if table not in systemData:
systemData[table] = initialId
success = self._saveSystemTable(systemData)
@ -271,20 +219,10 @@ class DatabaseConnector:
return False
def _removeInitialId(self, table: str) -> bool:
"""
Removes the initial ID for a table from the system table.
Args:
table: Name of the table
Returns:
True on success, False on error
"""
"""Removes the initial ID for a table from the system table."""
try:
# Load the current system table
systemData = self._loadSystemTable()
# Remove the entry if it exists
if table in systemData:
del systemData[table]
success = self._saveSystemTable(systemData)
@ -299,12 +237,7 @@ class DatabaseConnector:
# Public API
def getTables(self) -> List[str]:
"""
Returns a list of all available tables.
Returns:
List of table names
"""
"""Returns a list of all available tables."""
tables = []
try:
@ -318,38 +251,18 @@ class DatabaseConnector:
return tables
def getFields(self, table: str) -> List[str]:
"""
Returns a list of all fields in a table.
Args:
table: Name of the table
Returns:
List of field names
"""
# Load the table data
"""Returns a list of all fields in a table."""
data = self._loadTable(table)
if not data:
return []
# Take the first record as a reference for the fields
fields = list(data[0].keys()) if data else []
return fields
def getSchema(self, table: str, language: str = None) -> Dict[str, Dict[str, Any]]:
"""
Returns a schema object for a table with data types and labels.
Args:
table: Name of the table
language: Language for the labels (optional)
Returns:
Schema object with fields, data types and labels
"""
# Load the table data
"""Returns a schema object for a table with data types and labels."""
data = self._loadTable(table)
schema = {}
@ -357,14 +270,10 @@ class DatabaseConnector:
if not data:
return schema
# Take the first record as a reference for the fields and data types
firstRecord = data[0]
for field, value in firstRecord.items():
# Determine the data type
dataType = type(value).__name__
# Create label (default is the field name)
label = field
schema[field] = {
@ -375,32 +284,18 @@ class DatabaseConnector:
return schema
def getRecordset(self, table: str, fieldFilter: List[str] = None, recordFilter: Dict[str, Any] = None) -> List[Dict[str, Any]]:
"""
Returns a list of records from a table, filtered by criteria.
Args:
table: Name of the table
fieldFilter: Filter for fields (which fields should be returned)
recordFilter: Filter for records (which records should be returned)
Returns:
List of filtered records
"""
# Load the table data
"""Returns a list of records from a table, filtered by criteria."""
data = self._loadTable(table)
logger.debug(f"getRecordset: data volume of {len(data)} bytes")
# Filter by tenant and user context
filteredData = self._filterByContext(data)
logger.debug(f"getRecordset: data volume of {len(data)} records")
# Apply recordFilter if available
if recordFilter:
filteredData = self._applyRecordFilter(filteredData, recordFilter)
data = self._applyRecordFilter(data, recordFilter)
# If fieldFilter is available, reduce the fields
if fieldFilter and isinstance(fieldFilter, list):
result = []
for record in filteredData:
for record in data:
filteredRecord = {}
for field in fieldFilter:
if field in record:
@ -408,23 +303,13 @@ class DatabaseConnector:
result.append(filteredRecord)
return result
return filteredData
return data
def recordCreate(self, table: str, recordData: Dict[str, Any]) -> Dict[str, Any]:
"""
Creates a new record in the table.
Args:
table: Name of the table
recordData: Data for the new record
Returns:
The created record
"""
# Load the table data
"""Creates a new record in the table."""
data = self._loadTable(table)
# Add mandateId and userId if not present or 0
# Add mandateId and userId if not present
if "mandateId" not in recordData or recordData["mandateId"] == 0:
recordData["mandateId"] = self.mandateId
@ -453,30 +338,15 @@ class DatabaseConnector:
raise ValueError(f"Error creating the record in table {table}")
def recordDelete(self, table: str, recordId: Union[str, int]) -> bool:
"""
Deletes a record from the table.
Args:
table: Name of the table
recordId: ID of the record to delete
Returns:
True on success, False on error
"""
# Load table data
"""Deletes a record from the table."""
data = self._loadTable(table)
# Search for the record
for i, record in enumerate(data):
if "id" in record and record["id"] == recordId:
# Check if the record belongs to the current mandate
if "mandateId" in record and record["mandateId"] != self.mandateId:
raise ValueError("Not your mandate")
# Check if it's an initial record
initialId = self.getInitialId(table)
if initialId is not None and initialId == recordId:
# Remove this entry from the system table
self._removeInitialId(table)
logger.info(f"Initial ID {recordId} for table {table} has been removed from the system table")
@ -490,27 +360,12 @@ class DatabaseConnector:
return False
def recordModify(self, table: str, recordId: Union[str, int], recordData: Dict[str, Any]) -> Dict[str, Any]:
"""
Modifies a record in the table.
Args:
table: Name of the table
recordId: ID of the record to modify
recordData: New data for the record
Returns:
The updated record
"""
# Load table data
"""Modifies a record in the table."""
data = self._loadTable(table)
# Search for the record
for i, record in enumerate(data):
if "id" in record and record["id"] == recordId:
# Check if the record belongs to the current mandate
if "mandateId" in record and record["mandateId"] != self.mandateId:
raise ValueError("Not your mandate")
# Prevent changing the ID
if "id" in recordData and recordData["id"] != recordId:
raise ValueError(f"The ID of a record in table {table} cannot be changed")
@ -529,28 +384,12 @@ class DatabaseConnector:
raise ValueError(f"Record with ID {recordId} not found in table {table}")
def hasInitialId(self, table: str) -> bool:
"""
Checks if an initial ID is registered for a table.
Args:
table: Name of the table
Returns:
True if an initial ID is registered, otherwise False
"""
"""Checks if an initial ID is registered for a table."""
systemData = self._loadSystemTable()
return table in systemData
def getInitialId(self, table: str) -> Optional[int]:
"""
Returns the initial ID for a table.
Args:
table: Name of the table
Returns:
The initial ID or None if not present
"""
"""Returns the initial ID for a table."""
systemData = self._loadSystemTable()
initialId = systemData.get(table)
logger.debug(f"Database '{self.dbDatabase}': Initial ID for table '{table}' is {initialId}")
@ -559,11 +398,6 @@ class DatabaseConnector:
return initialId
def getAllInitialIds(self) -> Dict[str, int]:
"""
Returns all registered initial IDs.
Returns:
Dictionary with table names as keys and initial IDs as values
"""
"""Returns all registered initial IDs."""
systemData = self._loadSystemTable()
return systemData.copy() # Return a copy to protect the original

File diff suppressed because it is too large Load diff

View file

@ -25,13 +25,7 @@ class GatewayInterface:
"""
def __init__(self, mandateId: int = None, userId: int = None):
"""
Initializes the Gateway Interface with optional mandate and user context.
Args:
mandateId: ID of the current mandate (optional)
userId: ID of the current user (optional)
"""
"""Initializes the Gateway Interface with optional mandate and user context."""
# Context can be empty during initialization
self.mandateId = mandateId
self.userId = userId
@ -46,12 +40,35 @@ class GatewayInterface:
# Initialize database
self._initializeDatabase()
# Load user information
self.currentUser = self._getCurrentUserInfo()
# Initialize standard records if needed
self._initRecords()
def _getCurrentUserInfo(self) -> Dict[str, Any]:
"""Gets information about the current user including privileges."""
# For initialization, set default values
userInfo = {
"id": self.userId,
"mandateId": self.mandateId,
"privilege": "user", # Default privilege level
"language": "en"
}
# Try to load actual user info if IDs are provided
if self.userId:
userRecords = self.db.getRecordset("users", recordFilter={"id": self.userId})
if userRecords:
user = userRecords[0]
userInfo["privilege"] = user.get("privilege", "user")
userInfo["language"] = user.get("language", "en")
return userInfo
def _initializeDatabase(self):
"""
Initializes the database with minimal objects
"""
"""Initializes the database connection."""
self.db = DatabaseConnector(
dbHost=APP_CONFIG.get("DB_SYSTEM_HOST"),
dbDatabase=APP_CONFIG.get("DB_SYSTEM_DATABASE"),
@ -61,7 +78,13 @@ class GatewayInterface:
userId=self.userId if self.userId else 0
)
# Create Root mandate if needed
def _initRecords(self):
"""Initializes standard records in the database if they don't exist."""
self._initRootMandate()
self._initAdminUser()
def _initRootMandate(self):
"""Creates the Root mandate if it doesn't exist."""
existingMandateId = self.getInitialId("mandates")
mandates = self.db.getRecordset("mandates")
if existingMandateId is None or not mandates:
@ -75,19 +98,9 @@ class GatewayInterface:
# Update mandate context
self.mandateId = createdMandate['id']
self.userId = createdMandate['userId']
# Recreate connector with correct context
self.db = DatabaseConnector(
dbHost=APP_CONFIG.get("DB_SYSTEM_HOST"),
dbDatabase=APP_CONFIG.get("DB_SYSTEM_DATABASE"),
dbUser=APP_CONFIG.get("DB_SYSTEM_USER"),
dbPassword=APP_CONFIG.get("DB_SYSTEM_PASSWORD_SECRET"),
mandateId=self.mandateId,
userId=self.userId
)
# Create Admin user if needed
def _initAdminUser(self):
"""Creates the Admin user if it doesn't exist."""
existingUserId = self.getInitialId("users")
users = self.db.getRecordset("users")
if existingUserId is None or not users:
@ -99,57 +112,127 @@ class GatewayInterface:
"fullName": "Administrator",
"disabled": False,
"language": "de",
"privilege": "sysadmin", # SysAdmin privilege
"hashedPassword": self._getPasswordHash("admin") # Use a secure password in production!
"privilege": "sysadmin",
"hashedPassword": self._getPasswordHash("The 1st Poweron Admin") # Use a secure password in production!
}
createdUser = self.db.recordCreate("users", adminUser)
logger.info(f"Admin user created with ID {createdUser['id']}")
# Update user context
self.userId = createdUser['id']
def _uam(self, table: str, recordset: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Unified user access management function that filters data based on user privileges.
Args:
table: Name of the table
recordset: Recordset to filter based on access rules
# Recreate connector with correct context
self.db = DatabaseConnector(
dbHost=APP_CONFIG.get("DB_SYSTEM_HOST"),
dbDatabase=APP_CONFIG.get("DB_SYSTEM_DATABASE"),
dbUser=APP_CONFIG.get("DB_SYSTEM_USER"),
dbPassword=APP_CONFIG.get("DB_SYSTEM_PASSWORD_SECRET"),
mandateId=self.mandateId,
userId=self.userId
)
Returns:
Filtered recordset based on user privilege level
"""
userPrivilege = self.currentUser.get("privilege", "user")
# Apply filtering based on privilege
if userPrivilege == "sysadmin":
return recordset # System admins see all records
elif userPrivilege == "admin":
# Admins see records in their mandate
return [r for r in recordset if r.get("mandateId") == self.mandateId]
else: # Regular users
# Users only see records they own within their mandate
return [r for r in recordset
if r.get("mandateId") == self.mandateId and r.get("userId") == self.userId]
def _canModify(self, table: str, recordId: Optional[int] = None) -> bool:
"""
Checks if the current user can modify (create/update/delete) records in a table.
Args:
table: Name of the table
recordId: Optional record ID for specific record check
Returns:
Boolean indicating permission
"""
userPrivilege = self.currentUser.get("privilege", "user")
# System admins can modify anything
if userPrivilege == "sysadmin":
return True
# Check specific record permissions
if recordId is not None:
# Get the record to check ownership
records = self.db.getRecordset(table, recordFilter={"id": recordId})
if not records:
return False
record = records[0]
# Admins can modify anything in their mandate
if userPrivilege == "admin" and record.get("mandateId") == self.mandateId:
# Exception: Can't modify Root mandate unless you are a sysadmin
if table == "mandates" and recordId == 1 and userPrivilege != "sysadmin":
return False
return True
# Users can only modify their own records
if (record.get("mandateId") == self.mandateId and
record.get("userId") == self.userId):
return True
return False
else:
# For general table modify permission (e.g., create)
# Admins can create anything in their mandate
if userPrivilege == "admin":
return True
# Regular users can create most entities
if table == "mandates":
return False # Regular users can't create mandates
return True
def getInitialId(self, table: str) -> Optional[int]:
"""Returns the initial ID for a table"""
"""Returns the initial ID for a table."""
return self.db.getInitialId(table)
def _getPasswordHash(self, password: str) -> str:
"""Creates a hash for a password"""
"""Creates a hash for a password."""
return pwdContext.hash(password)
def _verifyPassword(self, plainPassword: str, hashedPassword: str) -> bool:
"""Checks if the password matches the hash"""
"""Checks if the password matches the hash."""
return pwdContext.verify(plainPassword, hashedPassword)
def _getCurrentTimestamp(self) -> str:
"""Returns the current timestamp in ISO format"""
"""Returns the current timestamp in ISO format."""
from datetime import datetime
return datetime.now().isoformat()
# Mandate methods
def getAllMandates(self) -> List[Dict[str, Any]]:
"""Returns all mandates"""
return self.db.getRecordset("mandates")
"""Returns mandates based on user access level."""
allMandates = self.db.getRecordset("mandates")
return self._uam("mandates", allMandates)
def getMandate(self, mandateId: int) -> Optional[Dict[str, Any]]:
"""Returns a mandate by its ID"""
"""Returns a mandate by ID if user has access."""
mandates = self.db.getRecordset("mandates", recordFilter={"id": mandateId})
if mandates:
return mandates[0]
return None
if not mandates:
return None
filteredMandates = self._uam("mandates", mandates)
return filteredMandates[0] if filteredMandates else None
def createMandate(self, name: str, language: str = "de") -> Dict[str, Any]:
"""Creates a new mandate"""
"""Creates a new mandate if user has permission."""
if not self._canModify("mandates"):
raise PermissionError("No permission to create mandates")
mandateData = {
"name": name,
"language": language
@ -158,43 +241,29 @@ class GatewayInterface:
return self.db.recordCreate("mandates", mandateData)
def updateMandate(self, mandateId: int, mandateData: Dict[str, Any]) -> Dict[str, Any]:
"""
Updates an existing mandate
Args:
mandateId: The ID of the mandate to update
mandateData: The mandate data to update
Returns:
Dict[str, Any]: The updated mandate data
Raises:
ValueError: If the mandate is not found
"""
# Check if the mandate exists
"""Updates a mandate if user has access."""
# Check if the mandate exists and user has access
mandate = self.getMandate(mandateId)
if not mandate:
raise ValueError(f"Mandate with ID {mandateId} not found")
if not self._canModify("mandates", mandateId):
raise PermissionError(f"No permission to update mandate {mandateId}")
# Update the mandate
updatedMandate = self.db.recordModify("mandates", mandateId, mandateData)
return updatedMandate
return self.db.recordModify("mandates", mandateId, mandateData)
def deleteMandate(self, mandateId: int) -> bool:
"""
Deletes a mandate and all associated users and data
Args:
mandateId: The ID of the mandate to delete
Returns:
bool: True if the mandate was successfully deleted, otherwise False
Deletes a mandate and all associated users and data if user has permission.
"""
# Check if the mandate exists
# Check if the mandate exists and user has access
mandate = self.getMandate(mandateId)
if not mandate:
return False
if not self._canModify("mandates", mandateId):
raise PermissionError(f"No permission to delete mandate {mandateId}")
# Check if it's the initial mandate
initialMandateId = self.getInitialId("mandates")
@ -222,33 +291,37 @@ class GatewayInterface:
# User methods
def getAllUsers(self) -> List[Dict[str, Any]]:
"""Returns all users"""
users = self.db.getRecordset("users")
# Remove password hashes from the response
for user in users:
"""Returns users based on user access level."""
allUsers = self.db.getRecordset("users")
filteredUsers = self._uam("users", allUsers)
# Remove password hashes
for user in filteredUsers:
if "hashedPassword" in user:
del user["hashedPassword"]
return users
return filteredUsers
def getUsersByMandate(self, mandateId: int) -> List[Dict[str, Any]]:
"""
Returns all users of a specific mandate
Args:
mandateId: The ID of the mandate
"""Returns users for a specific mandate if user has access."""
# First check if user has access to the mandate
mandate = self.getMandate(mandateId)
if not mandate:
return []
Returns:
List[Dict[str, Any]]: List of users in the mandate
"""
# Get users for this mandate
users = self.db.getRecordset("users", recordFilter={"mandateId": mandateId})
# Remove password hashes from the response
for user in users:
filteredUsers = self._uam("users", users)
# Remove password hashes
for user in filteredUsers:
if "hashedPassword" in user:
del user["hashedPassword"]
return users
return filteredUsers
def getUserByUsername(self, username: str) -> Optional[Dict[str, Any]]:
"""Returns a user by username"""
"""Returns a user by username."""
users = self.db.getRecordset("users")
for user in users:
if user.get("username") == username:
@ -256,48 +329,49 @@ class GatewayInterface:
return None
def getUser(self, userId: int) -> Optional[Dict[str, Any]]:
"""Returns a user by ID"""
"""Returns a user by ID if user has access."""
users = self.db.getRecordset("users", recordFilter={"id": userId})
if users:
user = users[0]
# Remove password hash from the API response
if "hashedPassword" in user:
userCopy = user.copy()
del userCopy["hashedPassword"]
return userCopy
return user
return None
if not users:
return None
filteredUsers = self._uam("users", users)
if not filteredUsers:
return None
user = filteredUsers[0]
# Remove password hash
if "hashedPassword" in user:
userCopy = user.copy()
del userCopy["hashedPassword"]
return userCopy
return user
def createUser(self, username: str, password: str, email: str = None,
fullName: str = None, language: str = "de", mandateId: int = None,
disabled: bool = False, privilege: str = "user") -> Dict[str, Any]:
"""
Creates a new user
Args:
username: The username
password: The password
email: The email address (optional)
fullName: The full name (optional)
language: The preferred language (default: "de")
mandateId: The ID of the mandate (optional)
disabled: Whether the user is disabled (default: False)
privilege: The privilege level (default: "user")
Returns:
Dict[str, Any]: The created user data
Raises:
ValueError: If the username already exists
"""
"""Creates a new user if current user has permission."""
# Check if the username already exists
existingUser = self.getUserByUsername(username)
if existingUser:
raise ValueError(f"User '{username}' already exists")
# Use the provided mandateId or the current context
userMandateId = mandateId if mandateId is not None else self.mandateId
# Check if user has access to the mandate
if userMandateId != self.mandateId and self.currentUser.get("privilege") != "sysadmin":
raise PermissionError(f"No permission to create users in mandate {userMandateId}")
if not self._canModify("users"):
raise PermissionError("No permission to create users")
# Check privilege escalation
if (privilege == "sysadmin" or
(privilege == "admin" and self.currentUser.get("privilege") == "user")):
raise PermissionError(f"Cannot create user with higher privilege: {privilege}")
userData = {
"mandateId": userMandateId,
"username": username,
@ -318,16 +392,7 @@ class GatewayInterface:
return createdUser
def authenticateUser(self, username: str, password: str) -> Optional[Dict[str, Any]]:
"""
Authenticates a user by username and password
Args:
username: The username
password: The password
Returns:
Optional[Dict[str, Any]]: The user data or None if authentication fails
"""
"""Authenticates a user by username and password."""
user = self.getUserByUsername(username)
if not user:
@ -348,25 +413,29 @@ class GatewayInterface:
return authenticatedUser
def updateUser(self, userId: int, userData: Dict[str, Any]) -> Dict[str, Any]:
"""
Updates a user
"""Updates a user if current user has permission."""
# Check if the user exists and current user has access
user = self.getUser(userId)
if not user:
# Try to get the raw user record for admin access check
users = self.db.getRecordset("users", recordFilter={"id": userId})
if not users:
raise ValueError(f"User with ID {userId} not found")
# Check if current user is admin/sysadmin
if not self._canModify("users", userId):
raise PermissionError(f"No permission to update user {userId}")
user = users[0]
Args:
userId: The ID of the user to update
userData: The user data to update
# Check privilege escalation
if "privilege" in userData:
currentPrivilege = self.currentUser.get("privilege")
targetPrivilege = userData["privilege"]
Returns:
Dict[str, Any]: The updated user data
Raises:
ValueError: If the user is not found
"""
# Get the current user with password hash (directly from DB)
users = self.db.getRecordset("users", recordFilter={"id": userId})
if not users:
raise ValueError(f"User with ID {userId} not found")
user = users[0]
if (targetPrivilege == "sysadmin" and currentPrivilege != "sysadmin") or (
targetPrivilege == "admin" and currentPrivilege == "user"):
raise PermissionError(f"Cannot escalate privilege to {targetPrivilege}")
# If the password is being changed, hash it
if "password" in userData:
@ -383,22 +452,15 @@ class GatewayInterface:
return updatedUser
def disableUser(self, userId: int) -> Dict[str, Any]:
"""Disables a user"""
"""Disables a user if current user has permission."""
return self.updateUser(userId, {"disabled": True})
def enableUser(self, userId: int) -> Dict[str, Any]:
"""Enables a user"""
"""Enables a user if current user has permission."""
return self.updateUser(userId, {"disabled": False})
def _deleteUserReferencedData(self, userId: int) -> None:
"""
Deletes all data associated with a user
Args:
userId: The ID of the user
"""
# Here all tables are searched and all entries referencing this user are deleted
"""Deletes all data associated with a user."""
# Delete user attributes
try:
attributes = self.db.getRecordset("attributes", recordFilter={"userId": userId})
@ -407,25 +469,18 @@ class GatewayInterface:
except Exception as e:
logger.error(f"Error deleting attributes for user {userId}: {e}")
# Other tables that might reference the user
# (Depending on the application's database structure)
logger.info(f"All referenced data for user {userId} has been deleted")
def deleteUser(self, userId: int) -> bool:
"""
Deletes a user and all associated data
Args:
userId: The ID of the user to delete
Returns:
bool: True if the user was successfully deleted, otherwise False
"""
"""Deletes a user and all associated data if current user has permission."""
# Check if the user exists
users = self.db.getRecordset("users", recordFilter={"id": userId})
if not users:
return False
# Check if current user has permission
if not self._canModify("users", userId):
raise PermissionError(f"No permission to delete user {userId}")
# Check if it's the initial user
initialUserId = self.getInitialId("users")
@ -454,18 +509,11 @@ def getGatewayInterface(mandateId: int = None, userId: int = None) -> GatewayInt
"""
Returns a GatewayInterface instance for the specified context.
Reuses existing instances.
Args:
mandateId: ID of the mandate
userId: ID of the user
Returns:
GatewayInterface instance
"""
contextKey = f"{mandateId}_{userId}"
if contextKey not in _gatewayInterfaces:
_gatewayInterfaces[contextKey] = GatewayInterface(mandateId, userId)
return _gatewayInterfaces[contextKey]
# Initialize the interface
# Initialize an instance
getGatewayInterface()

File diff suppressed because it is too large Load diff

View file

@ -1 +0,0 @@
{'total_pixels': None, 'total_characters': None}

View file

@ -0,0 +1,38 @@
inputFiles = [] # DO NOT CHANGE THIS LINE
def is_prime(n):
if n <= 1:
return False
if n <= 3:
return True
if n % 2 == 0 or n % 3 == 0:
return False
i = 5
while i * i <= n:
if n % i == 0 or n % (i + 2) == 0:
return False
i += 6
return True
def generate_primes(count):
primes = []
num = 2
while len(primes) < count:
if is_prime(num):
primes.append(num)
num += 1
return primes
primes = generate_primes(1000)
prime_numbers_content = "\n".join(map(str, primes))
result = {
"prime_numbers.txt": {
"content": prime_numbers_content,
"base64Encoded": False,
"contentType": "text/plain"
}
}
import json
print(json.dumps(result))

File diff suppressed because one or more lines are too long

View file

@ -648,3 +648,353 @@
4813
4817
4831
4861
4871
4877
4889
4903
4909
4919
4931
4933
4937
4943
4951
4957
4967
4969
4973
4987
4993
4999
5003
5009
5011
5021
5023
5039
5051
5059
5077
5081
5087
5099
5101
5107
5113
5119
5147
5153
5167
5171
5179
5189
5197
5209
5227
5231
5233
5237
5261
5273
5279
5281
5297
5303
5309
5323
5333
5347
5351
5381
5387
5393
5399
5407
5413
5417
5419
5431
5437
5441
5443
5449
5471
5477
5479
5483
5501
5503
5507
5519
5521
5527
5531
5557
5563
5569
5573
5581
5591
5623
5639
5641
5647
5651
5653
5657
5659
5669
5683
5689
5693
5701
5711
5717
5737
5741
5743
5749
5779
5783
5791
5801
5807
5813
5821
5827
5839
5843
5849
5851
5857
5861
5867
5869
5879
5881
5897
5903
5923
5927
5939
5953
5981
5987
6007
6011
6029
6037
6043
6047
6053
6067
6073
6079
6089
6091
6101
6113
6121
6131
6133
6143
6151
6163
6173
6197
6199
6203
6211
6217
6221
6229
6247
6257
6263
6269
6271
6277
6287
6299
6301
6311
6317
6323
6329
6337
6343
6353
6359
6361
6367
6373
6379
6389
6397
6421
6427
6449
6451
6469
6473
6481
6491
6521
6529
6547
6551
6553
6563
6569
6571
6577
6581
6599
6607
6619
6637
6653
6659
6661
6673
6679
6689
6691
6701
6703
6709
6719
6733
6737
6761
6763
6779
6781
6791
6793
6803
6823
6827
6829
6833
6841
6857
6863
6869
6871
6883
6899
6907
6911
6917
6947
6949
6959
6961
6967
6971
6977
6983
6991
6997
7001
7013
7019
7027
7039
7043
7057
7069
7079
7103
7109
7121
7127
7129
7151
7159
7177
7187
7193
7207
7211
7213
7219
7229
7237
7243
7247
7253
7283
7297
7307
7309
7321
7331
7333
7349
7351
7369
7393
7411
7417
7433
7451
7457
7459
7477
7481
7487
7489
7499
7507
7517
7523
7529
7537
7541
7547
7549
7559
7561
7573
7577
7583
7589
7591
7603
7607
7621
7639
7643
7649
7669
7673
7681
7687
7691
7699
7703
7717
7723
7727
7741
7753
7757
7759
7789
7793
7817
7823
7829
7841
7853
7867
7873
7877
7879
7883
7901
7907
7919