1224 lines
45 KiB
Python
1224 lines
45 KiB
Python
"""
|
|
Interface to the Gateway system.
|
|
Manages users and mandates for authentication.
|
|
"""
|
|
|
|
import logging
|
|
from typing import Dict, Any, List, Optional
|
|
from passlib.context import CryptContext
|
|
import uuid
|
|
|
|
from modules.connectors.connectorDbPostgre import DatabaseConnector
|
|
from modules.shared.configuration import APP_CONFIG
|
|
from modules.shared.timezoneUtils import getUtcTimestamp
|
|
from modules.interfaces.interfaceDbAppAccess import AppAccess
|
|
from modules.datamodels.datamodelUam import (
|
|
User,
|
|
Mandate,
|
|
UserInDB,
|
|
UserConnection,
|
|
AuthAuthority,
|
|
UserPrivilege,
|
|
ConnectionStatus,
|
|
)
|
|
from modules.datamodels.datamodelSecurity import Token, AuthEvent, TokenStatus
|
|
from modules.datamodels.datamodelNeutralizer import (
|
|
DataNeutraliserConfig,
|
|
DataNeutralizerAttributes,
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Singleton factory for AppObjects instances per context
|
|
_gatewayInterfaces = {}
|
|
|
|
# Root interface instance
|
|
_rootAppObjects = None
|
|
|
|
# Password-Hashing
|
|
pwdContext = CryptContext(schemes=["argon2"], deprecated="auto")
|
|
|
|
|
|
class AppObjects:
|
|
"""
|
|
Interface to the Gateway system.
|
|
Manages users and mandates.
|
|
"""
|
|
|
|
def __init__(self, currentUser: Optional[User] = None):
|
|
"""Initializes the Gateway Interface."""
|
|
# Initialize variables
|
|
self.currentUser = currentUser # Store User object directly
|
|
self.userId = currentUser.id if currentUser else None
|
|
self.mandateId = currentUser.mandateId if currentUser else None
|
|
self.access = None # Will be set when user context is provided
|
|
|
|
# Initialize database
|
|
self._initializeDatabase()
|
|
|
|
# Initialize standard records if needed
|
|
self._initRecords()
|
|
|
|
# Set user context if provided
|
|
if currentUser:
|
|
self.setUserContext(currentUser)
|
|
|
|
def setUserContext(self, currentUser: User):
|
|
"""Sets the user context for the interface."""
|
|
if not currentUser:
|
|
logger.info("Initializing interface without user context")
|
|
return
|
|
|
|
self.currentUser = currentUser # Store User object directly
|
|
self.userId = currentUser.id
|
|
self.mandateId = currentUser.mandateId
|
|
|
|
if not self.userId or not self.mandateId:
|
|
raise ValueError("Invalid user context: id and mandateId are required")
|
|
|
|
# Add language settings
|
|
self.userLanguage = currentUser.language # Default user language
|
|
|
|
# Initialize access control with user context
|
|
self.access = AppAccess(
|
|
self.currentUser, self.db
|
|
) # Convert to dict only when needed
|
|
|
|
# Update database context
|
|
self.db.updateContext(self.userId)
|
|
|
|
def __del__(self):
|
|
"""Cleanup method to close database connection."""
|
|
if hasattr(self, "db") and self.db is not None:
|
|
try:
|
|
self.db.close()
|
|
except Exception as e:
|
|
logger.error(f"Error closing database connection: {e}")
|
|
|
|
def _initializeDatabase(self):
|
|
"""Initializes the database connection directly."""
|
|
try:
|
|
# Get configuration values with defaults
|
|
dbHost = APP_CONFIG.get("DB_APP_HOST", "_no_config_default_data")
|
|
dbDatabase = APP_CONFIG.get("DB_APP_DATABASE", "app")
|
|
dbUser = APP_CONFIG.get("DB_APP_USER")
|
|
dbPassword = APP_CONFIG.get("DB_APP_PASSWORD_SECRET")
|
|
dbPort = int(APP_CONFIG.get("DB_APP_PORT", 5432))
|
|
|
|
# Create database connector directly
|
|
self.db = DatabaseConnector(
|
|
dbHost=dbHost,
|
|
dbDatabase=dbDatabase,
|
|
dbUser=dbUser,
|
|
dbPassword=dbPassword,
|
|
dbPort=dbPort,
|
|
userId=self.userId,
|
|
)
|
|
|
|
# Initialize database system
|
|
self.db.initDbSystem()
|
|
|
|
logger.info(f"Database initialized successfully for user {self.userId}")
|
|
except Exception as e:
|
|
logger.error(f"Failed to initialize database: {str(e)}")
|
|
raise
|
|
|
|
def _initRecords(self):
|
|
"""Initialize standard records if they don't exist."""
|
|
self._initRootMandate()
|
|
self._initAdminUser()
|
|
self._initEventUser()
|
|
|
|
def _initRootMandate(self):
|
|
"""Creates the Root mandate if it doesn't exist."""
|
|
existingMandateId = self.getInitialId(Mandate)
|
|
mandates = self.db.getRecordset(Mandate)
|
|
if existingMandateId is None or not mandates:
|
|
logger.info("Creating Root mandate")
|
|
rootMandate = Mandate(name="Root", language="en", enabled=True)
|
|
createdMandate = self.db.recordCreate(Mandate, rootMandate)
|
|
logger.info(f"Root mandate created with ID {createdMandate['id']}")
|
|
|
|
# Update mandate context
|
|
self.mandateId = createdMandate["id"]
|
|
|
|
def _initAdminUser(self):
|
|
"""Creates the Admin user if it doesn't exist."""
|
|
existingUserId = self.getInitialId(UserInDB)
|
|
users = self.db.getRecordset(UserInDB)
|
|
if existingUserId is None or not users:
|
|
logger.info("Creating Admin user")
|
|
adminUser = UserInDB(
|
|
mandateId=self.getInitialId(Mandate),
|
|
username="admin",
|
|
email="admin@example.com",
|
|
fullName="Administrator",
|
|
enabled=True,
|
|
language="en",
|
|
privilege=UserPrivilege.SYSADMIN,
|
|
authenticationAuthority="local", # Using lowercase value directly
|
|
hashedPassword=self._getPasswordHash(
|
|
APP_CONFIG.get("APP_INIT_PASS_ADMIN_SECRET")
|
|
),
|
|
connections=[],
|
|
)
|
|
createdUser = self.db.recordCreate(UserInDB, adminUser)
|
|
logger.info(f"Admin user created with ID {createdUser['id']}")
|
|
|
|
# Update user context
|
|
self.currentUser = createdUser
|
|
self.userId = createdUser.get("id")
|
|
|
|
def _initEventUser(self):
|
|
"""Creates the Event user if it doesn't exist."""
|
|
# Check if event user already exists
|
|
existingUsers = self.db.getRecordset(
|
|
UserInDB, recordFilter={"username": "event"}
|
|
)
|
|
if not existingUsers:
|
|
logger.info("Creating Event user")
|
|
eventUser = UserInDB(
|
|
mandateId=self.getInitialId(Mandate),
|
|
username="event",
|
|
email="event@example.com",
|
|
fullName="Event",
|
|
enabled=True,
|
|
language="en",
|
|
privilege=UserPrivilege.SYSADMIN,
|
|
authenticationAuthority="local", # Using lowercase value directly
|
|
hashedPassword=self._getPasswordHash(
|
|
APP_CONFIG.get("APP_INIT_PASS_EVENT_SECRET")
|
|
),
|
|
connections=[],
|
|
)
|
|
createdUser = self.db.recordCreate(UserInDB, eventUser)
|
|
logger.info(f"Event user created with ID {createdUser['id']}")
|
|
|
|
def _uam(
|
|
self, model_class: type, recordset: List[Dict[str, Any]]
|
|
) -> List[Dict[str, Any]]:
|
|
"""
|
|
Unified user access management function that filters data based on user privileges
|
|
and adds access control attributes.
|
|
|
|
Args:
|
|
model_class: Pydantic model class for the table
|
|
recordset: Recordset to filter based on access rules
|
|
|
|
Returns:
|
|
Filtered recordset with access control attributes
|
|
"""
|
|
# First apply access control
|
|
filteredRecords = self.access.uam(model_class, recordset)
|
|
|
|
# Then filter out database-specific fields
|
|
cleanedRecords = []
|
|
for record in filteredRecords:
|
|
# Create a new dict with only non-database fields
|
|
cleanedRecord = {k: v for k, v in record.items() if not k.startswith("_")}
|
|
cleanedRecords.append(cleanedRecord)
|
|
|
|
return cleanedRecords
|
|
|
|
def _canModify(self, model_class: type, recordId: Optional[str] = None) -> bool:
|
|
"""
|
|
Checks if the current user can modify (create/update/delete) records in a table.
|
|
|
|
Args:
|
|
model_class: Pydantic model class for the table
|
|
recordId: Optional record ID for specific record check
|
|
|
|
Returns:
|
|
Boolean indicating permission
|
|
"""
|
|
return self.access.canModify(model_class, recordId)
|
|
|
|
def getInitialId(self, model_class: type) -> Optional[str]:
|
|
"""Returns the initial ID for a table."""
|
|
return self.db.getInitialId(model_class)
|
|
|
|
def _getPasswordHash(self, password: str) -> str:
|
|
"""Creates a hash for a password."""
|
|
return pwdContext.hash(password)
|
|
|
|
def _verifyPassword(self, plainPassword: str, hashedPassword: str) -> bool:
|
|
"""Checks if the password matches the hash."""
|
|
return pwdContext.verify(plainPassword, hashedPassword)
|
|
|
|
# User methods
|
|
|
|
def getUsersByMandate(self, mandateId: str) -> List[User]:
|
|
"""Returns users for a specific mandate if user has access."""
|
|
# Get users for this mandate
|
|
users = self.db.getRecordset(UserInDB, recordFilter={"mandateId": mandateId})
|
|
filteredUsers = self._uam(UserInDB, users)
|
|
|
|
# Convert to User models
|
|
return [User(**user) for user in filteredUsers]
|
|
|
|
def getUserByUsername(self, username: str) -> Optional[User]:
|
|
"""Returns a user by username."""
|
|
try:
|
|
# Get users table
|
|
users = self.db.getRecordset(UserInDB)
|
|
if not users:
|
|
return None
|
|
|
|
# Find user by username
|
|
for user_dict in users:
|
|
if user_dict.get("username") == username:
|
|
return User(**user_dict)
|
|
|
|
logger.info(f"No user found with username {username}")
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting user by username: {str(e)}")
|
|
return None
|
|
|
|
def getUser(self, userId: str) -> Optional[User]:
|
|
"""Returns a user by ID if user has access."""
|
|
try:
|
|
# Get all users
|
|
users = self.db.getRecordset(UserInDB)
|
|
if not users:
|
|
return None
|
|
|
|
# Find user by ID
|
|
for user_dict in users:
|
|
if user_dict.get("id") == userId:
|
|
# Apply access control
|
|
filteredUsers = self._uam(UserInDB, [user_dict])
|
|
if filteredUsers:
|
|
return User(**filteredUsers[0])
|
|
return None
|
|
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting user by ID: {str(e)}")
|
|
return None
|
|
|
|
def authenticateLocalUser(self, username: str, password: str) -> Optional[User]:
|
|
"""Authenticates a user by username and password using local authentication."""
|
|
# Clear the users table from cache and reload it
|
|
|
|
# Get user by username
|
|
user = self.getUserByUsername(username)
|
|
|
|
if not user:
|
|
raise ValueError("User not found")
|
|
|
|
# Check if the user is enabled
|
|
if not user.enabled:
|
|
raise ValueError("User is disabled")
|
|
|
|
# Verify that the user has local authentication enabled
|
|
if user.authenticationAuthority != AuthAuthority.LOCAL:
|
|
raise ValueError("User does not have local authentication enabled")
|
|
|
|
# Get the full user record with password hash for verification
|
|
userRecord = self.db.getRecordset(UserInDB, recordFilter={"id": user.id})[0]
|
|
if not userRecord.get("hashedPassword"):
|
|
raise ValueError("User has no password set")
|
|
|
|
if not self._verifyPassword(password, userRecord["hashedPassword"]):
|
|
raise ValueError("Invalid password")
|
|
|
|
return user
|
|
|
|
def createUser(
|
|
self,
|
|
username: str,
|
|
password: str = None,
|
|
email: str = None,
|
|
fullName: str = None,
|
|
language: str = "en",
|
|
enabled: bool = True,
|
|
privilege: UserPrivilege = UserPrivilege.USER,
|
|
authenticationAuthority: AuthAuthority = AuthAuthority.LOCAL,
|
|
externalId: str = None,
|
|
externalUsername: str = None,
|
|
externalEmail: str = None,
|
|
) -> User:
|
|
"""Create a new user with optional external connection"""
|
|
try:
|
|
# Ensure username is a string
|
|
username = str(username).strip()
|
|
|
|
# Validate password for local authentication
|
|
if authenticationAuthority == AuthAuthority.LOCAL:
|
|
if not password:
|
|
raise ValueError("Password is required for local authentication")
|
|
if not isinstance(password, str):
|
|
raise ValueError("Password must be a string")
|
|
if not password.strip():
|
|
raise ValueError("Password cannot be empty")
|
|
|
|
# Create user data using UserInDB model
|
|
userData = UserInDB(
|
|
username=username,
|
|
email=email,
|
|
fullName=fullName,
|
|
language=language,
|
|
mandateId=self.mandateId,
|
|
enabled=enabled,
|
|
privilege=privilege,
|
|
authenticationAuthority=authenticationAuthority,
|
|
hashedPassword=self._getPasswordHash(password) if password else None,
|
|
connections=[],
|
|
)
|
|
|
|
# Create user record
|
|
createdRecord = self.db.recordCreate(UserInDB, userData)
|
|
if not createdRecord or not createdRecord.get("id"):
|
|
raise ValueError("Failed to create user record")
|
|
|
|
# Add external connection if provided
|
|
if externalId and externalUsername:
|
|
self.addUserConnection(
|
|
createdRecord["id"],
|
|
authenticationAuthority,
|
|
externalId,
|
|
externalUsername,
|
|
externalEmail,
|
|
)
|
|
|
|
# Get created user using the returned ID
|
|
createdUser = self.db.getRecordset(
|
|
UserInDB, recordFilter={"id": createdRecord["id"]}
|
|
)
|
|
if not createdUser or len(createdUser) == 0:
|
|
raise ValueError("Failed to retrieve created user")
|
|
|
|
# Clear cache to ensure fresh data (already done above)
|
|
|
|
return User(**createdUser[0])
|
|
|
|
except ValueError as e:
|
|
logger.error(f"Error creating user: {str(e)}")
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Unexpected error creating user: {str(e)}")
|
|
raise ValueError(f"Failed to create user: {str(e)}")
|
|
|
|
def updateUser(self, userId: str, updateData: Dict[str, Any]) -> User:
|
|
"""Update a user's information"""
|
|
try:
|
|
# Get user
|
|
user = self.getUser(userId)
|
|
if not user:
|
|
raise ValueError(f"User {userId} not found")
|
|
|
|
# Update user data using model
|
|
updatedData = user.model_dump()
|
|
updatedData.update(updateData)
|
|
updatedUser = User(**updatedData)
|
|
|
|
# Update user record
|
|
self.db.recordModify(UserInDB, userId, updatedUser)
|
|
|
|
# Get updated user
|
|
updatedUser = self.getUser(userId)
|
|
if not updatedUser:
|
|
raise ValueError("Failed to retrieve updated user")
|
|
|
|
return updatedUser
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error updating user: {str(e)}")
|
|
raise ValueError(f"Failed to update user: {str(e)}")
|
|
|
|
def disableUser(self, userId: str) -> User:
|
|
"""Disables a user if current user has permission."""
|
|
return self.updateUser(userId, {"enabled": False})
|
|
|
|
def enableUser(self, userId: str) -> User:
|
|
"""Enables a user if current user has permission."""
|
|
return self.updateUser(userId, {"enabled": True})
|
|
|
|
def _deleteUserReferencedData(self, userId: str) -> None:
|
|
"""Deletes all data associated with a user."""
|
|
try:
|
|
# Delete user auth events
|
|
events = self.db.getRecordset(AuthEvent, recordFilter={"userId": userId})
|
|
for event in events:
|
|
self.db.recordDelete(AuthEvent, event["id"])
|
|
|
|
# Delete user tokens
|
|
tokens = self.db.getRecordset(Token, recordFilter={"userId": userId})
|
|
for token in tokens:
|
|
self.db.recordDelete(Token, token["id"])
|
|
|
|
# Delete user connections
|
|
connections = self.db.getRecordset(
|
|
UserConnection, recordFilter={"userId": userId}
|
|
)
|
|
for conn in connections:
|
|
self.db.recordDelete(UserConnection, conn["id"])
|
|
|
|
logger.info(f"All referenced data for user {userId} has been deleted")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error deleting referenced data for user {userId}: {str(e)}")
|
|
raise
|
|
|
|
def deleteUser(self, userId: str) -> bool:
|
|
"""Deletes a user if current user has permission."""
|
|
try:
|
|
# Get user
|
|
user = self.getUser(userId)
|
|
if not user:
|
|
raise ValueError(f"User {userId} not found")
|
|
|
|
if not self._canModify(UserInDB, userId):
|
|
raise PermissionError(f"No permission to delete user {userId}")
|
|
|
|
# Delete all referenced data first
|
|
self._deleteUserReferencedData(userId)
|
|
|
|
# Delete user record
|
|
success = self.db.recordDelete(UserInDB, userId)
|
|
if not success:
|
|
raise ValueError(f"Failed to delete user {userId}")
|
|
|
|
logger.info(f"User {userId} successfully deleted")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error deleting user: {str(e)}")
|
|
raise ValueError(f"Failed to delete user: {str(e)}")
|
|
|
|
def _getInitialUser(self) -> Optional[Dict[str, Any]]:
|
|
"""Get the initial user record directly from database without access control."""
|
|
try:
|
|
initialUserId = self.getInitialId(UserInDB)
|
|
if not initialUserId:
|
|
return None
|
|
|
|
users = self.db.getRecordset(UserInDB, recordFilter={"id": initialUserId})
|
|
return users[0] if users else None
|
|
except Exception as e:
|
|
logger.error(f"Error getting initial user: {str(e)}")
|
|
return None
|
|
|
|
def checkUsernameAvailability(self, checkData: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Checks if a username is available for registration."""
|
|
try:
|
|
username = checkData.get("username")
|
|
authenticationAuthority = checkData.get("authenticationAuthority", "local")
|
|
|
|
if not username:
|
|
return {"available": False, "message": "Username is required"}
|
|
|
|
# Get user by username
|
|
user = self.getUserByUsername(username)
|
|
|
|
# Check if user exists (User model instance)
|
|
if user is not None:
|
|
return {"available": False, "message": "Username is already taken"}
|
|
|
|
return {"available": True, "message": "Username is available"}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error checking username availability: {str(e)}")
|
|
return {
|
|
"available": False,
|
|
"message": f"Error checking username availability: {str(e)}",
|
|
}
|
|
|
|
# Connection methods
|
|
|
|
def getUserConnections(self, userId: str) -> List[UserConnection]:
|
|
"""Returns all connections for a user."""
|
|
try:
|
|
# Get connections for this user
|
|
connections = self.db.getRecordset(
|
|
UserConnection, recordFilter={"userId": userId}
|
|
)
|
|
|
|
# Convert to UserConnection objects
|
|
result = []
|
|
for conn_dict in connections:
|
|
try:
|
|
# Create UserConnection object
|
|
connection = UserConnection(
|
|
id=conn_dict["id"],
|
|
userId=conn_dict["userId"],
|
|
authority=conn_dict.get("authority"),
|
|
externalId=conn_dict.get("externalId", ""),
|
|
externalUsername=conn_dict.get("externalUsername", ""),
|
|
externalEmail=conn_dict.get("externalEmail"),
|
|
status=conn_dict.get("status", "pending"),
|
|
connectedAt=conn_dict.get("connectedAt"),
|
|
lastChecked=conn_dict.get("lastChecked"),
|
|
expiresAt=conn_dict.get("expiresAt"),
|
|
)
|
|
result.append(connection)
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error converting connection dict to object: {str(e)}"
|
|
)
|
|
continue
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting user connections: {str(e)}")
|
|
return []
|
|
|
|
def addUserConnection(
|
|
self,
|
|
userId: str,
|
|
authority: AuthAuthority,
|
|
externalId: str,
|
|
externalUsername: str,
|
|
externalEmail: Optional[str] = None,
|
|
status: ConnectionStatus = ConnectionStatus.PENDING,
|
|
) -> UserConnection:
|
|
"""
|
|
Adds a new connection for a user.
|
|
|
|
Args:
|
|
userId: The ID of the user
|
|
authority: The authentication authority (e.g., MSFT, GOOGLE)
|
|
externalId: The external ID from the authority
|
|
externalUsername: The username from the authority
|
|
externalEmail: Optional email from the authority
|
|
status: The connection status (defaults to PENDING)
|
|
|
|
Returns:
|
|
The created UserConnection object
|
|
"""
|
|
try:
|
|
# Get the user
|
|
user = self.getUser(userId)
|
|
if not user:
|
|
raise ValueError(f"User not found: {userId}")
|
|
|
|
# Create new connection with all required fields
|
|
connection = UserConnection(
|
|
id=str(uuid.uuid4()),
|
|
userId=userId,
|
|
authority=authority,
|
|
externalId=externalId,
|
|
externalUsername=externalUsername,
|
|
externalEmail=externalEmail,
|
|
status=status,
|
|
connectedAt=getUtcTimestamp(),
|
|
lastChecked=getUtcTimestamp(),
|
|
expiresAt=None, # Optional field, set to None by default
|
|
)
|
|
|
|
# Save to connections table
|
|
self.db.recordCreate(UserConnection, connection)
|
|
|
|
return connection
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error adding user connection: {str(e)}")
|
|
raise ValueError(f"Failed to add user connection: {str(e)}")
|
|
|
|
def removeUserConnection(self, connectionId: str) -> None:
|
|
"""Remove a connection to an external service"""
|
|
try:
|
|
# Get connection
|
|
connections = self.db.getRecordset(
|
|
UserConnection, recordFilter={"id": connectionId}
|
|
)
|
|
|
|
if not connections:
|
|
raise ValueError(f"Connection {connectionId} not found")
|
|
|
|
# Delete connection
|
|
self.db.recordDelete(UserConnection, connectionId)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error removing user connection: {str(e)}")
|
|
raise ValueError(f"Failed to remove user connection: {str(e)}")
|
|
|
|
# Mandate methods
|
|
|
|
def getAllMandates(self) -> List[Mandate]:
|
|
"""Returns all mandates based on user access level."""
|
|
allMandates = self.db.getRecordset(Mandate)
|
|
filteredMandates = self._uam(Mandate, allMandates)
|
|
return [Mandate(**mandate) for mandate in filteredMandates]
|
|
|
|
def getMandate(self, mandateId: str) -> Optional[Mandate]:
|
|
"""Returns a mandate by ID if user has access."""
|
|
mandates = self.db.getRecordset(Mandate, recordFilter={"id": mandateId})
|
|
if not mandates:
|
|
return None
|
|
|
|
filteredMandates = self._uam(Mandate, mandates)
|
|
if not filteredMandates:
|
|
return None
|
|
|
|
return Mandate(**filteredMandates[0])
|
|
|
|
def createMandate(self, name: str, language: str = "en") -> Mandate:
|
|
"""Creates a new mandate if user has permission."""
|
|
if not self._canModify(Mandate):
|
|
raise PermissionError("No permission to create mandates")
|
|
|
|
# Create mandate data using model
|
|
mandateData = Mandate(name=name, language=language)
|
|
|
|
# Create mandate record
|
|
createdRecord = self.db.recordCreate(Mandate, mandateData)
|
|
if not createdRecord or not createdRecord.get("id"):
|
|
raise ValueError("Failed to create mandate record")
|
|
|
|
return Mandate(**createdRecord)
|
|
|
|
def updateMandate(self, mandateId: str, updateData: Dict[str, Any]) -> Mandate:
|
|
"""Updates a mandate if user has access."""
|
|
try:
|
|
# First check if user has permission to modify mandates
|
|
if not self._canModify(Mandate, mandateId):
|
|
raise PermissionError(f"No permission to update mandate {mandateId}")
|
|
|
|
# Get mandate with access control
|
|
mandate = self.getMandate(mandateId)
|
|
if not mandate:
|
|
raise ValueError(f"Mandate {mandateId} not found")
|
|
|
|
# Update mandate data using model
|
|
updatedData = mandate.model_dump()
|
|
updatedData.update(updateData)
|
|
updatedMandate = Mandate(**updatedData)
|
|
|
|
# Update mandate record
|
|
self.db.recordModify(Mandate, mandateId, updatedMandate)
|
|
|
|
# Clear cache to ensure fresh data
|
|
|
|
# Get updated mandate
|
|
updatedMandate = self.getMandate(mandateId)
|
|
if not updatedMandate:
|
|
raise ValueError("Failed to retrieve updated mandate")
|
|
|
|
return updatedMandate
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error updating mandate: {str(e)}")
|
|
raise ValueError(f"Failed to update mandate: {str(e)}")
|
|
|
|
def deleteMandate(self, mandateId: str) -> bool:
|
|
"""Deletes a mandate if user has access."""
|
|
try:
|
|
# Check if mandate exists and user has access
|
|
mandate = self.getMandate(mandateId)
|
|
if not mandate:
|
|
return False
|
|
|
|
if not self._canModify(Mandate, mandateId):
|
|
raise PermissionError(f"No permission to delete mandate {mandateId}")
|
|
|
|
# Check if mandate has users
|
|
users = self.getUsersByMandate(mandateId)
|
|
if users:
|
|
raise ValueError(
|
|
f"Cannot delete mandate {mandateId} with existing users"
|
|
)
|
|
|
|
# Delete mandate
|
|
success = self.db.recordDelete(Mandate, mandateId)
|
|
|
|
# Clear cache to ensure fresh data
|
|
|
|
return success
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error deleting mandate: {str(e)}")
|
|
raise ValueError(f"Failed to delete mandate: {str(e)}")
|
|
|
|
# Token methods
|
|
|
|
def saveAccessToken(self, token: Token, replace_existing: bool = True) -> None:
|
|
"""Save an access token for the current user (must NOT have connectionId)"""
|
|
try:
|
|
# Validate that this is NOT a connection token
|
|
if token.connectionId:
|
|
raise ValueError(
|
|
"Access tokens cannot have connectionId - use saveConnectionToken instead"
|
|
)
|
|
|
|
# Validate user context
|
|
if not self.currentUser or not self.currentUser.id:
|
|
raise ValueError("No valid user context available for token storage")
|
|
|
|
# Set the user ID and mandate ID
|
|
token.userId = self.currentUser.id
|
|
|
|
# Ensure token has required fields
|
|
if not token.id:
|
|
token.id = str(uuid.uuid4())
|
|
if not token.createdAt:
|
|
token.createdAt = getUtcTimestamp()
|
|
|
|
# If replace_existing is True, delete old access tokens for this user and authority first
|
|
if replace_existing:
|
|
try:
|
|
old_tokens = self.db.getRecordset(
|
|
Token,
|
|
recordFilter={
|
|
"userId": self.currentUser.id,
|
|
"authority": token.authority,
|
|
"connectionId": None, # Ensure we only delete access tokens
|
|
},
|
|
)
|
|
deleted_count = 0
|
|
for old_token in old_tokens:
|
|
if (
|
|
old_token["id"] != token.id
|
|
): # Don't delete the new token if it already exists
|
|
self.db.recordDelete(Token, old_token["id"])
|
|
deleted_count += 1
|
|
|
|
if deleted_count > 0:
|
|
logger.info(
|
|
f"Replaced {deleted_count} old access tokens for user {self.currentUser.id} and authority {token.authority}"
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.warning(
|
|
f"Failed to delete old access tokens for user {self.currentUser.id} and authority {token.authority}: {str(e)}"
|
|
)
|
|
# Continue with saving the new token even if deletion fails
|
|
|
|
# Convert to dict and ensure all fields are properly set
|
|
token_dict = token.model_dump()
|
|
# Ensure userId is set to current user
|
|
# Convert to dict and ensure all fields are properly set
|
|
token_dict = token.model_dump()
|
|
# Ensure userId is set to current user
|
|
token_dict["userId"] = self.currentUser.id
|
|
|
|
# Save to database
|
|
self.db.recordCreate(Token, token_dict)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error saving access token: {str(e)}")
|
|
raise
|
|
|
|
def saveConnectionToken(self, token: Token, replace_existing: bool = True) -> None:
|
|
"""Save a connection token (must have connectionId)"""
|
|
try:
|
|
# Validate that this IS a connection token
|
|
if not token.connectionId:
|
|
raise ValueError(
|
|
"Connection tokens must have connectionId - use saveAccessToken instead"
|
|
)
|
|
|
|
# Validate user context
|
|
if not self.currentUser or not self.currentUser.id:
|
|
raise ValueError("No valid user context available for token storage")
|
|
|
|
# Set the user ID for the connection token
|
|
token.userId = self.currentUser.id
|
|
|
|
# Ensure token has required fields
|
|
if not token.id:
|
|
token.id = str(uuid.uuid4())
|
|
if not token.createdAt:
|
|
token.createdAt = getUtcTimestamp()
|
|
|
|
# Convert to dict and ensure all fields are properly set
|
|
token_dict = token.model_dump()
|
|
# Ensure userId is set to current user
|
|
token_dict["userId"] = self.currentUser.id
|
|
|
|
# Save to database
|
|
self.db.recordCreate(Token, token_dict)
|
|
|
|
# After successful save, delete old tokens for this connectionId (if requested)
|
|
if replace_existing:
|
|
try:
|
|
old_tokens = self.db.getRecordset(
|
|
Token, recordFilter={"connectionId": token.connectionId}
|
|
)
|
|
deleted_count = 0
|
|
for old_token in old_tokens:
|
|
if old_token["id"] != token.id:
|
|
self.db.recordDelete(Token, old_token["id"])
|
|
deleted_count += 1
|
|
|
|
if deleted_count > 0:
|
|
logger.info(
|
|
f"Replaced {deleted_count} old tokens for connectionId {token.connectionId}"
|
|
)
|
|
except Exception as e:
|
|
logger.warning(
|
|
f"Failed to delete old tokens for connectionId {token.connectionId}: {str(e)}"
|
|
)
|
|
# Keep the newly saved token; cleanup can be retried later
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error saving connection token: {str(e)}")
|
|
raise
|
|
|
|
def getConnectionToken(self, connectionId: str) -> Optional[Token]:
|
|
"""Get the latest stored token for a specific connectionId (no refresh)."""
|
|
try:
|
|
# Validate connectionId
|
|
if not connectionId:
|
|
raise ValueError("connectionId is required for getConnectionToken")
|
|
|
|
# Get token for this specific connection
|
|
# Query for specific connection
|
|
tokens = self.db.getRecordset(
|
|
Token, recordFilter={"connectionId": connectionId}
|
|
)
|
|
|
|
if not tokens:
|
|
logger.warning(
|
|
f"No connection token found for connectionId: {connectionId}"
|
|
)
|
|
return None
|
|
|
|
# Sort by expiration date and get the latest (most recent expiration)
|
|
tokens.sort(key=lambda x: x.get("expiresAt", 0), reverse=True)
|
|
latest_token = Token(**tokens[0])
|
|
|
|
# No auto-refresh here. Callers should use a higher-level service to refresh when needed.
|
|
|
|
return latest_token
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error getting connection token for connectionId {connectionId}: {str(e)}"
|
|
)
|
|
return None
|
|
|
|
def findActiveTokenById(
|
|
self,
|
|
tokenId: str,
|
|
userId: str,
|
|
authority: AuthAuthority,
|
|
sessionId: str = None,
|
|
mandateId: str = None,
|
|
) -> Optional[Token]:
|
|
"""Find an active access token by its id (jti) with optional session/tenant scoping."""
|
|
try:
|
|
recordFilter = {
|
|
"id": tokenId,
|
|
"userId": userId,
|
|
"authority": authority.value
|
|
if hasattr(authority, "value")
|
|
else str(authority),
|
|
"status": TokenStatus.ACTIVE,
|
|
}
|
|
if sessionId is not None:
|
|
recordFilter["sessionId"] = sessionId
|
|
if mandateId is not None:
|
|
recordFilter["mandateId"] = mandateId
|
|
tokens = self.db.getRecordset(Token, recordFilter=recordFilter)
|
|
if not tokens:
|
|
return None
|
|
return Token(**tokens[0])
|
|
except Exception as e:
|
|
logger.error(f"Error finding active token by id {tokenId}: {str(e)}")
|
|
return None
|
|
|
|
def revokeTokenById(self, tokenId: str, revokedBy: str, reason: str = None) -> bool:
|
|
"""Revoke a single token by id by setting status fields (no delete)."""
|
|
try:
|
|
existing = self.db.getRecordset(Token, recordFilter={"id": tokenId})
|
|
if not existing:
|
|
return False
|
|
token = existing[0]
|
|
if token.get("status") == TokenStatus.REVOKED:
|
|
return True
|
|
tokenUpdate = {
|
|
"status": TokenStatus.REVOKED,
|
|
"revokedAt": getUtcTimestamp(),
|
|
"revokedBy": revokedBy,
|
|
"reason": reason or "revoked",
|
|
}
|
|
self.db.recordModify(Token, tokenId, tokenUpdate)
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Error revoking token {tokenId}: {str(e)}")
|
|
return False
|
|
|
|
def revokeTokensBySessionId(
|
|
self,
|
|
sessionId: str,
|
|
userId: str,
|
|
authority: AuthAuthority,
|
|
revokedBy: str,
|
|
reason: str = None,
|
|
) -> int:
|
|
"""Revoke all tokens of a session for a user/authority."""
|
|
try:
|
|
tokens = self.db.getRecordset(
|
|
Token,
|
|
recordFilter={
|
|
"userId": userId,
|
|
"authority": authority.value
|
|
if hasattr(authority, "value")
|
|
else str(authority),
|
|
"sessionId": sessionId,
|
|
"status": TokenStatus.ACTIVE,
|
|
},
|
|
)
|
|
count = 0
|
|
for t in tokens:
|
|
self.db.recordModify(
|
|
Token,
|
|
t["id"],
|
|
{
|
|
"status": TokenStatus.REVOKED,
|
|
"revokedAt": getUtcTimestamp(),
|
|
"revokedBy": revokedBy,
|
|
"reason": reason or "session logout",
|
|
},
|
|
)
|
|
count += 1
|
|
return count
|
|
except Exception as e:
|
|
logger.error(f"Error revoking tokens for session {sessionId}: {str(e)}")
|
|
return 0
|
|
|
|
def revokeTokensByUser(
|
|
self,
|
|
userId: str,
|
|
authority: AuthAuthority = None,
|
|
mandateId: str = None,
|
|
revokedBy: str = None,
|
|
reason: str = None,
|
|
) -> int:
|
|
"""Revoke all active tokens for a user, optionally filtered by authority/mandate."""
|
|
try:
|
|
# Fetch all active tokens for user (optionally filtered by authority)
|
|
recordFilter = {
|
|
"userId": userId,
|
|
"status": TokenStatus.ACTIVE,
|
|
}
|
|
if authority is not None:
|
|
recordFilter["authority"] = (
|
|
authority.value if hasattr(authority, "value") else str(authority)
|
|
)
|
|
tokens = self.db.getRecordset(Token, recordFilter=recordFilter)
|
|
count = 0
|
|
for t in tokens:
|
|
self.db.recordModify(
|
|
Token,
|
|
t["id"],
|
|
{
|
|
"status": TokenStatus.REVOKED,
|
|
"revokedAt": getUtcTimestamp(),
|
|
"revokedBy": revokedBy,
|
|
"reason": reason or "admin revoke",
|
|
},
|
|
)
|
|
count += 1
|
|
return count
|
|
except Exception as e:
|
|
logger.error(f"Error revoking tokens for user {userId}: {str(e)}")
|
|
return 0
|
|
|
|
def cleanupExpiredTokens(self) -> int:
|
|
"""Clean up expired tokens for all connections, returns count of cleaned tokens"""
|
|
try:
|
|
current_time = getUtcTimestamp()
|
|
cleaned_count = 0
|
|
|
|
# Get all tokens
|
|
all_tokens = self.db.getRecordset(Token, recordFilter={})
|
|
|
|
for token_data in all_tokens:
|
|
if (
|
|
token_data.get("expiresAt")
|
|
and token_data.get("expiresAt") < current_time
|
|
):
|
|
# Token is expired, delete it
|
|
self.db.recordDelete(Token, token_data["id"])
|
|
cleaned_count += 1
|
|
|
|
# Clear cache to ensure fresh data
|
|
if cleaned_count > 0:
|
|
logger.info(f"Cleaned up {cleaned_count} expired tokens")
|
|
|
|
return cleaned_count
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error cleaning up expired tokens: {str(e)}")
|
|
return 0
|
|
|
|
def logout(self) -> None:
|
|
"""Logout current user - clear user context and tokens"""
|
|
try:
|
|
# Clear user context
|
|
self.currentUser = None
|
|
self.userId = None
|
|
self.mandateId = None
|
|
self.access = None
|
|
|
|
# Clear database context
|
|
if hasattr(self, "db"):
|
|
self.db.updateContext("")
|
|
|
|
logger.info("User logged out successfully")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error during logout: {str(e)}")
|
|
raise
|
|
|
|
# Neutralization methods
|
|
|
|
def getNeutralizationConfig(self) -> Optional[DataNeutraliserConfig]:
|
|
"""Get the data neutralization configuration for the current user's mandate"""
|
|
try:
|
|
configs = self.db.getRecordset(
|
|
DataNeutraliserConfig, recordFilter={"mandateId": self.mandateId}
|
|
)
|
|
if not configs:
|
|
return None
|
|
|
|
# Apply access control
|
|
filtered_configs = self._uam(DataNeutraliserConfig, configs)
|
|
if not filtered_configs:
|
|
return None
|
|
|
|
return DataNeutraliserConfig(**filtered_configs[0])
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting neutralization config: {str(e)}")
|
|
return None
|
|
|
|
def createOrUpdateNeutralizationConfig(
|
|
self, config_data: Dict[str, Any]
|
|
) -> DataNeutraliserConfig:
|
|
"""Create or update the data neutralization configuration"""
|
|
try:
|
|
# Check if config already exists
|
|
existing_config = self.getNeutralizationConfig()
|
|
|
|
if existing_config:
|
|
# Update existing config
|
|
update_data = existing_config.model_dump()
|
|
update_data.update(config_data)
|
|
update_data["updatedAt"] = getUtcTimestamp()
|
|
|
|
updated_config = DataNeutraliserConfig(**update_data)
|
|
self.db.recordModify(
|
|
DataNeutraliserConfig, existing_config.id, updated_config
|
|
)
|
|
|
|
return updated_config
|
|
else:
|
|
# Create new config
|
|
config_data["mandateId"] = self.mandateId
|
|
config_data["userId"] = self.userId
|
|
|
|
new_config = DataNeutraliserConfig(**config_data)
|
|
created_record = self.db.recordCreate(DataNeutraliserConfig, new_config)
|
|
|
|
return DataNeutraliserConfig(**created_record)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error creating/updating neutralization config: {str(e)}")
|
|
raise ValueError(f"Failed to create/update neutralization config: {str(e)}")
|
|
|
|
def getNeutralizationAttributes(
|
|
self, file_id: Optional[str] = None
|
|
) -> List[DataNeutralizerAttributes]:
|
|
"""Get neutralization attributes, optionally filtered by file ID"""
|
|
try:
|
|
filter_dict = {"mandateId": self.mandateId}
|
|
if file_id:
|
|
filter_dict["fileId"] = file_id
|
|
|
|
attributes = self.db.getRecordset(
|
|
DataNeutralizerAttributes, recordFilter=filter_dict
|
|
)
|
|
filtered_attributes = self._uam(DataNeutralizerAttributes, attributes)
|
|
|
|
return [
|
|
DataNeutralizerAttributes(**attr)
|
|
for attr in filtered_attributes
|
|
]
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting neutralization attributes: {str(e)}")
|
|
return []
|
|
|
|
def deleteNeutralizationAttributes(self, file_id: str) -> bool:
|
|
"""Delete all neutralization attributes for a specific file"""
|
|
try:
|
|
attributes = self.db.getRecordset(
|
|
DataNeutralizerAttributes,
|
|
recordFilter={"mandateId": self.mandateId, "fileId": file_id},
|
|
)
|
|
|
|
for attribute in attributes:
|
|
self.db.recordDelete(DataNeutralizerAttributes, attribute["id"])
|
|
|
|
logger.info(
|
|
f"Deleted {len(attributes)} neutralization attributes for file {file_id}"
|
|
)
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error deleting neutralization attributes: {str(e)}")
|
|
return False
|
|
|
|
|
|
# Public Methods
|
|
|
|
|
|
def getInterface(currentUser: User) -> AppObjects:
|
|
"""
|
|
Returns a AppObjects instance for the current user.
|
|
Handles initialization of database and records.
|
|
"""
|
|
if not currentUser:
|
|
raise ValueError("Invalid user context: user is required")
|
|
|
|
# Create context key
|
|
contextKey = f"{currentUser.mandateId}_{currentUser.id}"
|
|
|
|
# Create new instance if not exists
|
|
if contextKey not in _gatewayInterfaces:
|
|
_gatewayInterfaces[contextKey] = AppObjects(currentUser)
|
|
|
|
return _gatewayInterfaces[contextKey]
|
|
|
|
|
|
def getRootInterface() -> AppObjects:
|
|
"""
|
|
Returns a AppObjects instance with root privileges.
|
|
This is used for initial setup and user creation.
|
|
"""
|
|
global _rootAppObjects
|
|
|
|
if _rootAppObjects is None:
|
|
try:
|
|
# Create a temporary interface without user context to get root user
|
|
tempInterface = AppObjects()
|
|
|
|
# Get the initial user directly
|
|
initialUserId = tempInterface.getInitialId(UserInDB)
|
|
if not initialUserId:
|
|
raise ValueError("No initial user ID found in database")
|
|
|
|
users = tempInterface.db.getRecordset(
|
|
UserInDB, recordFilter={"id": initialUserId}
|
|
)
|
|
if not users:
|
|
raise ValueError("Initial user not found in database")
|
|
|
|
# Convert to User model (use helper compatible with our models)
|
|
user_data = users[0]
|
|
rootUser = User(**user_data)
|
|
|
|
# Create root interface with the root user
|
|
_rootAppObjects = AppObjects(rootUser)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting root user: {str(e)}")
|
|
raise ValueError(f"Failed to get root user: {str(e)}")
|
|
|
|
return _rootAppObjects
|