849 lines
34 KiB
Python
849 lines
34 KiB
Python
"""
|
|
Interface to the Gateway system.
|
|
Manages users and mandates for authentication.
|
|
"""
|
|
|
|
from datetime import datetime, timedelta, UTC
|
|
import os
|
|
import logging
|
|
from typing import Dict, Any, List, Optional, Union
|
|
import importlib
|
|
import json
|
|
from passlib.context import CryptContext
|
|
import uuid
|
|
|
|
from modules.connectors.connectorDbJson import DatabaseConnector
|
|
from modules.shared.configuration import APP_CONFIG
|
|
from modules.interfaces.serviceAppAccess import AppAccess
|
|
from modules.interfaces.serviceAppModel import (
|
|
User, Mandate, UserInDB, UserConnection,
|
|
Session, AuthEvent, AuthAuthority, UserPrivilege,
|
|
ConnectionStatus, Token, LocalToken, GoogleToken, MsftToken
|
|
)
|
|
from modules.shared.attributeUtils import ModelMixin
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Singleton factory for GatewayInterface instances per context
|
|
_gatewayInterfaces = {}
|
|
|
|
# Root interface instance
|
|
_rootGatewayInterface = None
|
|
|
|
# Password-Hashing
|
|
pwdContext = CryptContext(schemes=["argon2"], deprecated="auto")
|
|
|
|
class GatewayInterface:
|
|
"""
|
|
Interface to the Gateway system.
|
|
Manages users and mandates.
|
|
"""
|
|
|
|
def __init__(self, currentUser: Optional[User] = None):
|
|
"""Initializes the Gateway Interface."""
|
|
# Initialize variables
|
|
self.currentUser = currentUser # Store User object directly
|
|
self.userId = currentUser.id if currentUser else None
|
|
self.mandateId = currentUser.mandateId if currentUser else None
|
|
self.access = None # Will be set when user context is provided
|
|
|
|
# Initialize database
|
|
self._initializeDatabase()
|
|
|
|
# Initialize standard records if needed
|
|
self._initRecords()
|
|
|
|
# Set user context if provided
|
|
if currentUser:
|
|
self.setUserContext(currentUser)
|
|
|
|
def setUserContext(self, currentUser: User):
|
|
"""Sets the user context for the interface."""
|
|
if not currentUser:
|
|
logger.info("Initializing interface without user context")
|
|
return
|
|
|
|
self.currentUser = currentUser # Store User object directly
|
|
self.userId = currentUser.id
|
|
self.mandateId = currentUser.mandateId
|
|
|
|
if not self.userId or not self.mandateId:
|
|
raise ValueError("Invalid user context: id and mandateId are required")
|
|
|
|
# Add language settings
|
|
self.userLanguage = currentUser.language # Default user language
|
|
|
|
# Initialize access control with user context
|
|
self.access = AppAccess(self.currentUser, self.db) # Convert to dict only when needed
|
|
|
|
# Update database context
|
|
self.db.updateContext(self.userId)
|
|
|
|
logger.debug(f"User context set: userId={self.userId}, mandateId={self.mandateId}")
|
|
|
|
def _initializeDatabase(self):
|
|
"""Initializes the database connection."""
|
|
try:
|
|
# Get configuration values with defaults
|
|
dbHost = APP_CONFIG.get("DB_APP_HOST", "_no_config_default_data")
|
|
dbDatabase = APP_CONFIG.get("DB_APP_DATABASE", "app")
|
|
dbUser = APP_CONFIG.get("DB_APP_USER")
|
|
dbPassword = APP_CONFIG.get("DB_APP_PASSWORD_SECRET")
|
|
|
|
# Ensure the database directory exists
|
|
os.makedirs(dbHost, exist_ok=True)
|
|
|
|
self.db = DatabaseConnector(
|
|
dbHost=dbHost,
|
|
dbDatabase=dbDatabase,
|
|
dbUser=dbUser,
|
|
dbPassword=dbPassword
|
|
)
|
|
|
|
logger.info("Database initialized successfully")
|
|
except Exception as e:
|
|
logger.error(f"Failed to initialize database: {str(e)}")
|
|
raise
|
|
|
|
def _initRecords(self):
|
|
"""Initialize standard records if they don't exist."""
|
|
self._initRootMandate()
|
|
self._initAdminUser()
|
|
|
|
def _initRootMandate(self):
|
|
"""Creates the Root mandate if it doesn't exist."""
|
|
existingMandateId = self.getInitialId("mandates")
|
|
mandates = self.db.getRecordset("mandates")
|
|
if existingMandateId is None or not mandates:
|
|
logger.info("Creating Root mandate")
|
|
rootMandate = Mandate(
|
|
name="Root",
|
|
language="en",
|
|
enabled=True
|
|
)
|
|
createdMandate = self.db.recordCreate("mandates", rootMandate.to_dict())
|
|
logger.info(f"Root mandate created with ID {createdMandate['id']}")
|
|
|
|
# Register the initial ID
|
|
self.db._registerInitialId("mandates", createdMandate['id'])
|
|
|
|
# Update mandate context
|
|
self.mandateId = createdMandate['id']
|
|
|
|
def _initAdminUser(self):
|
|
"""Creates the Admin user if it doesn't exist."""
|
|
existingUserId = self.getInitialId("users")
|
|
users = self.db.getRecordset("users")
|
|
if existingUserId is None or not users:
|
|
logger.info("Creating Admin user")
|
|
adminUser = UserInDB(
|
|
mandateId=self.getInitialId("mandates"),
|
|
username="admin",
|
|
email="admin@example.com",
|
|
fullName="Administrator",
|
|
enabled=True,
|
|
language="en",
|
|
privilege=UserPrivilege.SYSADMIN,
|
|
authenticationAuthority="local", # Using lowercase value directly
|
|
hashedPassword=self._getPasswordHash("The 1st Poweron Admin"), # Use a secure password in production!
|
|
connections=[]
|
|
)
|
|
createdUser = self.db.recordCreate("users", adminUser.to_dict())
|
|
logger.info(f"Admin user created with ID {createdUser['id']}")
|
|
|
|
# Register the initial ID
|
|
self.db._registerInitialId("users", createdUser['id'])
|
|
|
|
# Update user context
|
|
self.currentUser = createdUser
|
|
self.userId = createdUser.get("id")
|
|
|
|
def _uam(self, table: str, recordset: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
|
"""
|
|
Unified user access management function that filters data based on user privileges
|
|
and adds access control attributes.
|
|
|
|
Args:
|
|
table: Name of the table
|
|
recordset: Recordset to filter based on access rules
|
|
|
|
Returns:
|
|
Filtered recordset with access control attributes
|
|
"""
|
|
# First apply access control
|
|
filteredRecords = self.access.uam(table, recordset)
|
|
|
|
# Then filter out database-specific fields
|
|
cleanedRecords = []
|
|
for record in filteredRecords:
|
|
# Create a new dict with only non-database fields
|
|
cleanedRecord = {k: v for k, v in record.items() if not k.startswith('_')}
|
|
cleanedRecords.append(cleanedRecord)
|
|
|
|
return cleanedRecords
|
|
|
|
def _canModify(self, table: str, recordId: Optional[str] = None) -> bool:
|
|
"""
|
|
Checks if the current user can modify (create/update/delete) records in a table.
|
|
|
|
Args:
|
|
table: Name of the table
|
|
recordId: Optional record ID for specific record check
|
|
|
|
Returns:
|
|
Boolean indicating permission
|
|
"""
|
|
return self.access.canModify(table, recordId)
|
|
|
|
def getInitialId(self, table: str) -> Optional[str]:
|
|
"""Returns the initial ID for a table."""
|
|
return self.db.getInitialId(table)
|
|
|
|
def _getPasswordHash(self, password: str) -> str:
|
|
"""Creates a hash for a password."""
|
|
return pwdContext.hash(password)
|
|
|
|
def _verifyPassword(self, plainPassword: str, hashedPassword: str) -> bool:
|
|
"""Checks if the password matches the hash."""
|
|
return pwdContext.verify(plainPassword, hashedPassword)
|
|
|
|
# User methods
|
|
|
|
def getAllUsers(self) -> List[User]:
|
|
"""Returns users based on user access level."""
|
|
allUsers = self.db.getRecordset("users")
|
|
filteredUsers = self._uam("users", allUsers)
|
|
|
|
# Convert to User models
|
|
return [User.from_dict(user) for user in filteredUsers]
|
|
|
|
def getUsersByMandate(self, mandateId: str) -> List[User]:
|
|
"""Returns users for a specific mandate if user has access."""
|
|
# Get users for this mandate
|
|
users = self.db.getRecordset("users", recordFilter={"mandateId": mandateId})
|
|
filteredUsers = self._uam("users", users)
|
|
|
|
# Convert to User models
|
|
return [User.from_dict(user) for user in filteredUsers]
|
|
|
|
def getUserByUsername(self, username: str) -> Optional[User]:
|
|
"""Returns a user by username."""
|
|
try:
|
|
# Get users table
|
|
users = self.db.getRecordset("users")
|
|
if not users:
|
|
return None
|
|
|
|
# Find user by username
|
|
for user_dict in users:
|
|
if user_dict.get("username") == username:
|
|
logger.info(f"Found user with username {username}")
|
|
return User.from_dict(user_dict)
|
|
|
|
logger.info(f"No user found with username {username}")
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting user by username: {str(e)}")
|
|
return None
|
|
|
|
def getUser(self, userId: str) -> Optional[User]:
|
|
"""Returns a user by ID if user has access."""
|
|
try:
|
|
# Get all users
|
|
users = self.db.getRecordset("users")
|
|
if not users:
|
|
return None
|
|
|
|
# Find user by ID
|
|
for user_dict in users:
|
|
if user_dict.get("id") == userId:
|
|
# Apply access control
|
|
filteredUsers = self._uam("users", [user_dict])
|
|
if filteredUsers:
|
|
return User.from_dict(filteredUsers[0])
|
|
return None
|
|
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting user by ID: {str(e)}")
|
|
return None
|
|
|
|
def getUserConnections(self, userId: str) -> List[UserConnection]:
|
|
"""Returns all connections for a user."""
|
|
try:
|
|
# Get connections for this user
|
|
connections = self.db.getRecordset("connections", recordFilter={"userId": userId})
|
|
|
|
# Convert to UserConnection objects
|
|
result = []
|
|
for conn_dict in connections:
|
|
try:
|
|
# Convert string dates to datetime objects
|
|
for field in ['connectedAt', 'lastChecked', 'expiresAt']:
|
|
if field in conn_dict and conn_dict[field]:
|
|
try:
|
|
if isinstance(conn_dict[field], str):
|
|
conn_dict[field] = datetime.fromisoformat(conn_dict[field].replace('Z', '+00:00'))
|
|
except (ValueError, TypeError):
|
|
conn_dict[field] = None
|
|
|
|
# Create UserConnection object
|
|
connection = UserConnection(
|
|
id=conn_dict["id"],
|
|
userId=conn_dict["userId"],
|
|
authority=conn_dict.get("authority"),
|
|
externalId=conn_dict.get("externalId", ""),
|
|
externalUsername=conn_dict.get("externalUsername", ""),
|
|
externalEmail=conn_dict.get("externalEmail"),
|
|
status=conn_dict.get("status", "pending"),
|
|
connectedAt=conn_dict.get("connectedAt"),
|
|
lastChecked=conn_dict.get("lastChecked"),
|
|
expiresAt=conn_dict.get("expiresAt")
|
|
)
|
|
result.append(connection)
|
|
except Exception as e:
|
|
logger.error(f"Error converting connection dict to object: {str(e)}")
|
|
continue
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting user connections: {str(e)}")
|
|
return []
|
|
|
|
def addUserConnection(self, userId: str, authority: AuthAuthority, externalId: str,
|
|
externalUsername: str, externalEmail: Optional[str] = None,
|
|
status: ConnectionStatus = ConnectionStatus.PENDING) -> UserConnection:
|
|
"""
|
|
Adds a new connection for a user.
|
|
|
|
Args:
|
|
userId: The ID of the user
|
|
authority: The authentication authority (e.g., MSFT, GOOGLE)
|
|
externalId: The external ID from the authority
|
|
externalUsername: The username from the authority
|
|
externalEmail: Optional email from the authority
|
|
status: The connection status (defaults to PENDING)
|
|
|
|
Returns:
|
|
The created UserConnection object
|
|
"""
|
|
try:
|
|
# Get the user
|
|
user = self.getUser(userId)
|
|
if not user:
|
|
raise ValueError(f"User not found: {userId}")
|
|
|
|
# Create new connection with all required fields
|
|
connection = UserConnection(
|
|
id=str(uuid.uuid4()),
|
|
userId=userId,
|
|
authority=authority,
|
|
externalId=externalId,
|
|
externalUsername=externalUsername,
|
|
externalEmail=externalEmail,
|
|
status=status,
|
|
connectedAt=datetime.now(UTC),
|
|
lastChecked=datetime.now(UTC),
|
|
expiresAt=None # Optional field, set to None by default
|
|
)
|
|
|
|
# Save to connections table
|
|
self.db.recordCreate("connections", connection.to_dict())
|
|
|
|
return connection
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error adding user connection: {str(e)}")
|
|
raise ValueError(f"Failed to add user connection: {str(e)}")
|
|
|
|
def removeUserConnection(self, connectionId: str) -> None:
|
|
"""Remove a connection to an external service"""
|
|
try:
|
|
# Get connection
|
|
connections = self.db.getRecordset("connections", recordFilter={
|
|
"id": connectionId
|
|
})
|
|
|
|
if not connections:
|
|
raise ValueError(f"Connection {connectionId} not found")
|
|
|
|
# Delete connection
|
|
self.db.recordDelete("connections", connectionId)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error removing user connection: {str(e)}")
|
|
raise ValueError(f"Failed to remove user connection: {str(e)}")
|
|
|
|
def authenticateLocalUser(self, username: str, password: str) -> Optional[User]:
|
|
"""Authenticates a user by username and password using local authentication."""
|
|
# Clear the users table from cache and reload it
|
|
if "users" in self.db._tablesCache:
|
|
del self.db._tablesCache["users"]
|
|
|
|
# Get user by username
|
|
user = self.getUserByUsername(username)
|
|
|
|
if not user:
|
|
raise ValueError("User not found")
|
|
|
|
# Check if the user is enabled
|
|
if not user.enabled:
|
|
raise ValueError("User is disabled")
|
|
|
|
# Verify that the user has local authentication enabled
|
|
if user.authenticationAuthority != AuthAuthority.LOCAL:
|
|
raise ValueError("User does not have local authentication enabled")
|
|
|
|
# Get the full user record with password hash for verification
|
|
userRecord = self.db.getRecordset("users", recordFilter={"id": user.id})[0]
|
|
if not userRecord.get("hashedPassword"):
|
|
raise ValueError("User has no password set")
|
|
|
|
if not self._verifyPassword(password, userRecord["hashedPassword"]):
|
|
raise ValueError("Invalid password")
|
|
|
|
return user
|
|
|
|
def createUser(self, username: str, password: str = None, email: str = None,
|
|
fullName: str = None, language: str = "en", enabled: bool = True,
|
|
privilege: UserPrivilege = UserPrivilege.USER,
|
|
authenticationAuthority: AuthAuthority = AuthAuthority.LOCAL,
|
|
externalId: str = None, externalUsername: str = None,
|
|
externalEmail: str = None) -> User:
|
|
"""Create a new user with optional external connection"""
|
|
try:
|
|
# Ensure username is a string
|
|
username = str(username).strip()
|
|
|
|
# Validate password for local authentication
|
|
if authenticationAuthority == AuthAuthority.LOCAL:
|
|
if not password:
|
|
raise ValueError("Password is required for local authentication")
|
|
if not isinstance(password, str):
|
|
raise ValueError("Password must be a string")
|
|
if not password.strip():
|
|
raise ValueError("Password cannot be empty")
|
|
|
|
# Create user data using UserInDB model
|
|
userData = UserInDB(
|
|
username=username,
|
|
email=email,
|
|
fullName=fullName,
|
|
language=language,
|
|
mandateId=self.mandateId,
|
|
enabled=enabled,
|
|
privilege=privilege,
|
|
authenticationAuthority=authenticationAuthority,
|
|
hashedPassword=self._getPasswordHash(password) if password else None,
|
|
connections=[]
|
|
)
|
|
|
|
# Create user record
|
|
createdRecord = self.db.recordCreate("users", userData.to_dict())
|
|
if not createdRecord or not createdRecord.get("id"):
|
|
raise ValueError("Failed to create user record")
|
|
|
|
# Add external connection if provided
|
|
if externalId and externalUsername:
|
|
self.addUserConnection(
|
|
createdRecord["id"],
|
|
authenticationAuthority,
|
|
externalId,
|
|
externalUsername,
|
|
externalEmail
|
|
)
|
|
|
|
# Get created user using the returned ID
|
|
createdUser = self.db.getRecordset("users", recordFilter={"id": createdRecord["id"]})
|
|
if not createdUser or len(createdUser) == 0:
|
|
raise ValueError("Failed to retrieve created user")
|
|
|
|
# Clear both table and metadata caches
|
|
if hasattr(self.db, '_tablesCache') and "users" in self.db._tablesCache:
|
|
del self.db._tablesCache["users"]
|
|
if hasattr(self.db, '_tableMetadataCache') and "users" in self.db._tableMetadataCache:
|
|
del self.db._tableMetadataCache["users"]
|
|
|
|
return User.from_dict(createdUser[0])
|
|
|
|
except ValueError as e:
|
|
logger.error(f"Error creating user: {str(e)}")
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Unexpected error creating user: {str(e)}")
|
|
raise ValueError(f"Failed to create user: {str(e)}")
|
|
|
|
def updateUser(self, userId: str, updateData: Dict[str, Any]) -> User:
|
|
"""Update a user's information"""
|
|
try:
|
|
# Get user
|
|
user = self.getUser(userId)
|
|
if not user:
|
|
raise ValueError(f"User {userId} not found")
|
|
|
|
# Update user data using model
|
|
updatedData = user.to_dict()
|
|
updatedData.update(updateData)
|
|
updatedUser = User.from_dict(updatedData)
|
|
|
|
# Update user record
|
|
self.db.recordModify("users", userId, updatedUser.to_dict())
|
|
|
|
# Get updated user
|
|
updatedUser = self.getUser(userId)
|
|
if not updatedUser:
|
|
raise ValueError("Failed to retrieve updated user")
|
|
|
|
return updatedUser
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error updating user: {str(e)}")
|
|
raise ValueError(f"Failed to update user: {str(e)}")
|
|
|
|
def disableUser(self, userId: str) -> User:
|
|
"""Disables a user if current user has permission."""
|
|
return self.updateUser(userId, {"enabled": False})
|
|
|
|
def enableUser(self, userId: str) -> User:
|
|
"""Enables a user if current user has permission."""
|
|
return self.updateUser(userId, {"enabled": True})
|
|
|
|
def _deleteUserReferencedData(self, userId: str) -> None:
|
|
"""Deletes all data associated with a user."""
|
|
try:
|
|
# Delete user sessions
|
|
sessions = self.db.getRecordset("sessions", recordFilter={"userId": userId})
|
|
for session in sessions:
|
|
self.db.recordDelete("sessions", session["id"])
|
|
logger.debug(f"Deleted session {session['id']} for user {userId}")
|
|
|
|
# Delete user auth events
|
|
events = self.db.getRecordset("auth_events", recordFilter={"userId": userId})
|
|
for event in events:
|
|
self.db.recordDelete("auth_events", event["id"])
|
|
logger.debug(f"Deleted auth event {event['id']} for user {userId}")
|
|
|
|
# Delete user tokens
|
|
tokens = self.db.getRecordset("tokens", recordFilter={"userId": userId})
|
|
for token in tokens:
|
|
self.db.recordDelete("tokens", token["id"])
|
|
logger.debug(f"Deleted token {token['id']} for user {userId}")
|
|
|
|
# Delete user connections
|
|
connections = self.db.getRecordset("connections", recordFilter={"userId": userId})
|
|
for conn in connections:
|
|
self.db.recordDelete("connections", conn["id"])
|
|
logger.debug(f"Deleted connection {conn['id']} for user {userId}")
|
|
|
|
logger.info(f"All referenced data for user {userId} has been deleted")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error deleting referenced data for user {userId}: {str(e)}")
|
|
raise
|
|
|
|
def deleteUser(self, userId: str) -> bool:
|
|
"""Deletes a user if current user has permission."""
|
|
try:
|
|
# Get user
|
|
user = self.getUser(userId)
|
|
if not user:
|
|
raise ValueError(f"User {userId} not found")
|
|
|
|
if not self._canModify("users", userId):
|
|
raise PermissionError(f"No permission to delete user {userId}")
|
|
|
|
# Delete all referenced data first
|
|
self._deleteUserReferencedData(userId)
|
|
|
|
# Delete user record
|
|
success = self.db.recordDelete("users", userId)
|
|
if not success:
|
|
raise ValueError(f"Failed to delete user {userId}")
|
|
|
|
# Clear both table and metadata caches
|
|
if hasattr(self.db, '_tablesCache') and "users" in self.db._tablesCache:
|
|
del self.db._tablesCache["users"]
|
|
if hasattr(self.db, '_tableMetadataCache') and "users" in self.db._tableMetadataCache:
|
|
del self.db._tableMetadataCache["users"]
|
|
|
|
logger.info(f"User {userId} successfully deleted")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error deleting user: {str(e)}")
|
|
raise ValueError(f"Failed to delete user: {str(e)}")
|
|
|
|
# Mandate methods
|
|
|
|
def getAllMandates(self) -> List[Mandate]:
|
|
"""Returns all mandates based on user access level."""
|
|
allMandates = self.db.getRecordset("mandates")
|
|
filteredMandates = self._uam("mandates", allMandates)
|
|
return [Mandate.from_dict(mandate) for mandate in filteredMandates]
|
|
|
|
def getMandate(self, mandateId: str) -> Optional[Mandate]:
|
|
"""Returns a mandate by ID if user has access."""
|
|
mandates = self.db.getRecordset("mandates", recordFilter={"id": mandateId})
|
|
if not mandates:
|
|
return None
|
|
|
|
filteredMandates = self._uam("mandates", mandates)
|
|
if not filteredMandates:
|
|
return None
|
|
|
|
return Mandate.from_dict(filteredMandates[0])
|
|
|
|
def createMandate(self, name: str, language: str = "en") -> Mandate:
|
|
"""Creates a new mandate if user has permission."""
|
|
if not self._canModify("mandates"):
|
|
raise PermissionError("No permission to create mandates")
|
|
|
|
# Create mandate data using model
|
|
mandateData = Mandate(
|
|
name=name,
|
|
language=language
|
|
)
|
|
|
|
# Create mandate record
|
|
createdRecord = self.db.recordCreate("mandates", mandateData.to_dict())
|
|
if not createdRecord or not createdRecord.get("id"):
|
|
raise ValueError("Failed to create mandate record")
|
|
|
|
return Mandate.from_dict(createdRecord)
|
|
|
|
def updateMandate(self, mandateId: str, updateData: Dict[str, Any]) -> Mandate:
|
|
"""Updates a mandate if user has access."""
|
|
try:
|
|
logger.debug(f"Updating mandate {mandateId} with data: {updateData}")
|
|
|
|
# First check if user has permission to modify mandates
|
|
if not self._canModify("mandates", mandateId):
|
|
raise PermissionError(f"No permission to update mandate {mandateId}")
|
|
|
|
# Get mandate with access control
|
|
mandate = self.getMandate(mandateId)
|
|
logger.debug(f"Retrieved mandate: {mandate}")
|
|
if not mandate:
|
|
raise ValueError(f"Mandate {mandateId} not found")
|
|
|
|
# Update mandate data using model
|
|
updatedData = mandate.to_dict()
|
|
updatedData.update(updateData)
|
|
logger.debug(f"Updated data: {updatedData}")
|
|
updatedMandate = Mandate.from_dict(updatedData)
|
|
|
|
# Update mandate record
|
|
self.db.recordModify("mandates", mandateId, updatedMandate.to_dict())
|
|
|
|
# Get updated mandate
|
|
updatedMandate = self.getMandate(mandateId)
|
|
if not updatedMandate:
|
|
raise ValueError("Failed to retrieve updated mandate")
|
|
|
|
return updatedMandate
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error updating mandate: {str(e)}")
|
|
raise ValueError(f"Failed to update mandate: {str(e)}")
|
|
|
|
def deleteMandate(self, mandateId: str) -> bool:
|
|
"""Deletes a mandate if user has access."""
|
|
try:
|
|
# Check if mandate exists and user has access
|
|
mandate = self.getMandate(mandateId)
|
|
if not mandate:
|
|
return False
|
|
|
|
if not self._canModify("mandates", mandateId):
|
|
raise PermissionError(f"No permission to delete mandate {mandateId}")
|
|
|
|
# Check if mandate has users
|
|
users = self.getUsersByMandate(mandateId)
|
|
if users:
|
|
raise ValueError(f"Cannot delete mandate {mandateId} with existing users")
|
|
|
|
# Delete mandate
|
|
return self.db.recordDelete("mandates", mandateId)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error deleting mandate: {str(e)}")
|
|
raise ValueError(f"Failed to delete mandate: {str(e)}")
|
|
|
|
def _getInitialUser(self) -> Optional[Dict[str, Any]]:
|
|
"""Get the initial user record directly from database without access control."""
|
|
try:
|
|
initialUserId = self.db.getInitialId("users")
|
|
if not initialUserId:
|
|
return None
|
|
|
|
users = self.db.getRecordset("users", recordFilter={"id": initialUserId})
|
|
return users[0] if users else None
|
|
except Exception as e:
|
|
logger.error(f"Error getting initial user: {str(e)}")
|
|
return None
|
|
|
|
def checkUsernameAvailability(self, checkData: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Checks if a username is available for registration."""
|
|
try:
|
|
username = checkData.get("username")
|
|
authenticationAuthority = checkData.get("authenticationAuthority", "local")
|
|
|
|
if not username:
|
|
return {
|
|
"available": False,
|
|
"message": "Username is required"
|
|
}
|
|
|
|
# Get user by username
|
|
user = self.getUserByUsername(username)
|
|
|
|
# Check if user exists (User model instance)
|
|
if user is not None:
|
|
return {
|
|
"available": False,
|
|
"message": "Username is already taken"
|
|
}
|
|
|
|
return {
|
|
"available": True,
|
|
"message": "Username is available"
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error checking username availability: {str(e)}")
|
|
return {
|
|
"available": False,
|
|
"message": f"Error checking username availability: {str(e)}"
|
|
}
|
|
|
|
def saveToken(self, token: Token) -> None:
|
|
"""Save a token for the current user"""
|
|
try:
|
|
# Validate user context
|
|
if not self.currentUser or not self.currentUser.id:
|
|
raise ValueError("No valid user context available for token storage")
|
|
|
|
# Set the user ID and mandate ID
|
|
token.userId = self.currentUser.id
|
|
|
|
# Ensure token has required fields
|
|
if not token.id:
|
|
token.id = str(uuid.uuid4())
|
|
if not token.createdAt:
|
|
token.createdAt = datetime.now()
|
|
|
|
# Convert to dict and ensure all fields are properly set
|
|
token_dict = token.dict()
|
|
token_dict["userId"] = self.currentUser.id
|
|
|
|
# Convert datetime objects to ISO format strings
|
|
if isinstance(token_dict.get("createdAt"), datetime):
|
|
token_dict["createdAt"] = token_dict["createdAt"].isoformat()
|
|
if isinstance(token_dict.get("expiresAt"), datetime):
|
|
token_dict["expiresAt"] = token_dict["expiresAt"].isoformat()
|
|
|
|
# Save to database
|
|
self.db.recordCreate("tokens", token_dict)
|
|
|
|
logger.debug(f"Token saved for user {self.currentUser.id} with authority {token.authority}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error saving token: {str(e)}")
|
|
raise
|
|
|
|
def getToken(self, authority: AuthAuthority) -> Optional[Token]:
|
|
"""Get the latest token for the current user and authority"""
|
|
try:
|
|
# Get tokens for this user and authority
|
|
tokens = self.db.getRecordset("tokens", recordFilter={
|
|
"userId": self.currentUser.id,
|
|
"authority": authority
|
|
})
|
|
|
|
if not tokens:
|
|
return None
|
|
|
|
# Sort by creation date and get the latest
|
|
tokens.sort(key=lambda x: x.get("createdAt", ""), reverse=True)
|
|
return Token(**tokens[0])
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting token: {str(e)}")
|
|
return None
|
|
|
|
def deleteToken(self, authority: AuthAuthority) -> None:
|
|
"""Delete all tokens for the current user and authority"""
|
|
try:
|
|
# Get tokens to delete
|
|
tokens = self.db.getRecordset("tokens", recordFilter={
|
|
"userId": self.currentUser.id,
|
|
"authority": authority
|
|
})
|
|
|
|
# Delete each token
|
|
for token in tokens:
|
|
self.db.recordDelete("tokens", token["id"])
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error deleting token: {str(e)}")
|
|
raise
|
|
|
|
# Public Methods
|
|
|
|
def getInterface(currentUser: User) -> GatewayInterface:
|
|
"""
|
|
Returns a GatewayInterface instance for the current user.
|
|
Handles initialization of database and records.
|
|
"""
|
|
if not currentUser:
|
|
raise ValueError("Invalid user context: user is required")
|
|
|
|
# Create context key
|
|
contextKey = f"{currentUser.mandateId}_{currentUser.id}"
|
|
|
|
# Create new instance if not exists
|
|
if contextKey not in _gatewayInterfaces:
|
|
_gatewayInterfaces[contextKey] = GatewayInterface(currentUser)
|
|
|
|
return _gatewayInterfaces[contextKey]
|
|
|
|
def getRootUser() -> User:
|
|
"""
|
|
Returns the root user from the database.
|
|
This is the user with the initial ID in the users table.
|
|
"""
|
|
try:
|
|
# Create a temporary interface without user context
|
|
tempInterface = GatewayInterface()
|
|
|
|
# Get the initial user directly
|
|
initialUserId = tempInterface.db.getInitialId("users")
|
|
if not initialUserId:
|
|
raise ValueError("No initial user ID found in database")
|
|
|
|
users = tempInterface.db.getRecordset("users", recordFilter={"id": initialUserId})
|
|
if not users:
|
|
raise ValueError("Initial user not found in database")
|
|
|
|
# Convert to User model and return the model instance
|
|
return User.from_dict(users[0])
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting root user: {str(e)}")
|
|
raise ValueError(f"Failed to get root user: {str(e)}")
|
|
|
|
def getRootInterface() -> GatewayInterface:
|
|
"""
|
|
Returns a GatewayInterface instance with root privileges.
|
|
This is used for initial setup and user creation.
|
|
"""
|
|
global _rootGatewayInterface
|
|
|
|
if _rootGatewayInterface is None:
|
|
rootUser = getRootUser()
|
|
_rootGatewayInterface = GatewayInterface(rootUser)
|
|
|
|
return _rootGatewayInterface
|