gateway/modules/interfaces/interfaceAppObjects.py
2025-09-17 02:12:34 +02:00

1261 lines
52 KiB
Python

"""
Interface to the Gateway system.
Manages users and mandates for authentication.
"""
from datetime import datetime, timedelta, UTC
import os
import logging
from typing import Dict, Any, List, Optional, Union
import importlib
import json
from passlib.context import CryptContext
import uuid
import re
from modules.connectors.connectorDbPostgre import DatabaseConnector
from modules.shared.configuration import APP_CONFIG
from modules.shared.timezoneUtils import get_utc_now, get_utc_timestamp
from modules.interfaces.interfaceAppAccess import AppAccess
from modules.interfaces.interfaceAppModel import (
User, Mandate, UserInDB, UserConnection,
AuthAuthority, UserPrivilege,
ConnectionStatus, Token, AuthEvent,
DataNeutraliserConfig, DataNeutralizerAttributes
)
logger = logging.getLogger(__name__)
# Singleton factory for AppObjects instances per context
_gatewayInterfaces = {}
# Root interface instance
_rootAppObjects = None
# Password-Hashing
pwdContext = CryptContext(schemes=["argon2"], deprecated="auto")
class AppObjects:
"""
Interface to the Gateway system.
Manages users and mandates.
"""
def __init__(self, currentUser: Optional[User] = None):
"""Initializes the Gateway Interface."""
# Initialize variables
self.currentUser = currentUser # Store User object directly
self.userId = currentUser.id if currentUser else None
self.mandateId = currentUser.mandateId if currentUser else None
self.access = None # Will be set when user context is provided
# Initialize database
self._initializeDatabase()
# Initialize standard records if needed
self._initRecords()
# Set user context if provided
if currentUser:
self.setUserContext(currentUser)
def setUserContext(self, currentUser: User):
"""Sets the user context for the interface."""
if not currentUser:
logger.info("Initializing interface without user context")
return
self.currentUser = currentUser # Store User object directly
self.userId = currentUser.id
self.mandateId = currentUser.mandateId
if not self.userId or not self.mandateId:
raise ValueError("Invalid user context: id and mandateId are required")
# Add language settings
self.userLanguage = currentUser.language # Default user language
# Initialize access control with user context
self.access = AppAccess(self.currentUser, self.db) # Convert to dict only when needed
# Update database context
self.db.updateContext(self.userId)
def __del__(self):
"""Cleanup method to close database connection."""
if hasattr(self, 'db') and self.db is not None:
try:
self.db.close()
except Exception as e:
logger.error(f"Error closing database connection: {e}")
def _initializeDatabase(self):
"""Initializes the database connection directly."""
try:
# Get configuration values with defaults
dbHost = APP_CONFIG.get("DB_APP_HOST", "_no_config_default_data")
dbDatabase = APP_CONFIG.get("DB_APP_DATABASE", "app")
dbUser = APP_CONFIG.get("DB_APP_USER")
dbPassword = APP_CONFIG.get("DB_APP_PASSWORD_SECRET")
dbPort = int(APP_CONFIG.get("DB_APP_PORT", 5432))
# Create database connector directly
self.db = DatabaseConnector(
dbHost=dbHost,
dbDatabase=dbDatabase,
dbUser=dbUser,
dbPassword=dbPassword,
dbPort=dbPort,
userId=self.userId
)
# Initialize database system
self.db.initDbSystem()
logger.info(f"Database initialized successfully for user {self.userId}")
except Exception as e:
logger.error(f"Failed to initialize database: {str(e)}")
raise
def _initRecords(self):
"""Initialize standard records if they don't exist."""
self._initRootMandate()
self._initAdminUser()
def _initRootMandate(self):
"""Creates the Root mandate if it doesn't exist."""
existingMandateId = self.getInitialId(Mandate)
mandates = self.db.getRecordset(Mandate)
if existingMandateId is None or not mandates:
logger.info("Creating Root mandate")
rootMandate = Mandate(
name="Root",
language="en",
enabled=True
)
createdMandate = self.db.recordCreate(Mandate, rootMandate)
logger.info(f"Root mandate created with ID {createdMandate['id']}")
# Update mandate context
self.mandateId = createdMandate['id']
def _initAdminUser(self):
"""Creates the Admin user if it doesn't exist."""
existingUserId = self.getInitialId(UserInDB)
users = self.db.getRecordset(UserInDB)
if existingUserId is None or not users:
logger.info("Creating Admin user")
adminUser = UserInDB(
mandateId=self.getInitialId(Mandate),
username="admin",
email="admin@example.com",
fullName="Administrator",
enabled=True,
language="en",
privilege=UserPrivilege.SYSADMIN,
authenticationAuthority="local", # Using lowercase value directly
hashedPassword=self._getPasswordHash("The 1st Poweron Admin"), # Use a secure password in production!
connections=[]
)
createdUser = self.db.recordCreate(UserInDB, adminUser)
logger.info(f"Admin user created with ID {createdUser['id']}")
# Update user context
self.currentUser = createdUser
self.userId = createdUser.get("id")
def _uam(self, model_class: type, recordset: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Unified user access management function that filters data based on user privileges
and adds access control attributes.
Args:
model_class: Pydantic model class for the table
recordset: Recordset to filter based on access rules
Returns:
Filtered recordset with access control attributes
"""
# First apply access control
filteredRecords = self.access.uam(model_class, recordset)
# Then filter out database-specific fields
cleanedRecords = []
for record in filteredRecords:
# Create a new dict with only non-database fields
cleanedRecord = {k: v for k, v in record.items() if not k.startswith('_')}
cleanedRecords.append(cleanedRecord)
return cleanedRecords
def _canModify(self, model_class: type, recordId: Optional[str] = None) -> bool:
"""
Checks if the current user can modify (create/update/delete) records in a table.
Args:
model_class: Pydantic model class for the table
recordId: Optional record ID for specific record check
Returns:
Boolean indicating permission
"""
return self.access.canModify(model_class, recordId)
def getInitialId(self, model_class: type) -> Optional[str]:
"""Returns the initial ID for a table."""
return self.db.getInitialId(model_class)
def _getPasswordHash(self, password: str) -> str:
"""Creates a hash for a password."""
return pwdContext.hash(password)
def _verifyPassword(self, plainPassword: str, hashedPassword: str) -> bool:
"""Checks if the password matches the hash."""
return pwdContext.verify(plainPassword, hashedPassword)
# User methods
def getUsersByMandate(self, mandateId: str) -> List[User]:
"""Returns users for a specific mandate if user has access."""
# Get users for this mandate
users = self.db.getRecordset(UserInDB, recordFilter={"mandateId": mandateId})
filteredUsers = self._uam(UserInDB, users)
# Convert to User models
return [User.from_dict(user) for user in filteredUsers]
def getUserByUsername(self, username: str) -> Optional[User]:
"""Returns a user by username."""
try:
# Get users table
users = self.db.getRecordset(UserInDB)
if not users:
return None
# Find user by username
for user_dict in users:
if user_dict.get("username") == username:
return User.from_dict(user_dict)
logger.info(f"No user found with username {username}")
return None
except Exception as e:
logger.error(f"Error getting user by username: {str(e)}")
return None
def getUser(self, userId: str) -> Optional[User]:
"""Returns a user by ID if user has access."""
try:
# Get all users
users = self.db.getRecordset(UserInDB)
if not users:
return None
# Find user by ID
for user_dict in users:
if user_dict.get("id") == userId:
# Apply access control
filteredUsers = self._uam(UserInDB, [user_dict])
if filteredUsers:
return User.from_dict(filteredUsers[0])
return None
return None
except Exception as e:
logger.error(f"Error getting user by ID: {str(e)}")
return None
def getUserConnections(self, userId: str) -> List[UserConnection]:
"""Returns all connections for a user."""
try:
# Get connections for this user
connections = self.db.getRecordset(UserConnection, recordFilter={"userId": userId})
# Convert to UserConnection objects
result = []
for conn_dict in connections:
try:
# Create UserConnection object
connection = UserConnection(
id=conn_dict["id"],
userId=conn_dict["userId"],
authority=conn_dict.get("authority"),
externalId=conn_dict.get("externalId", ""),
externalUsername=conn_dict.get("externalUsername", ""),
externalEmail=conn_dict.get("externalEmail"),
status=conn_dict.get("status", "pending"),
connectedAt=conn_dict.get("connectedAt"),
lastChecked=conn_dict.get("lastChecked"),
expiresAt=conn_dict.get("expiresAt")
)
result.append(connection)
except Exception as e:
logger.error(f"Error converting connection dict to object: {str(e)}")
continue
return result
except Exception as e:
logger.error(f"Error getting user connections: {str(e)}")
return []
def addUserConnection(self, userId: str, authority: AuthAuthority, externalId: str,
externalUsername: str, externalEmail: Optional[str] = None,
status: ConnectionStatus = ConnectionStatus.PENDING) -> UserConnection:
"""
Adds a new connection for a user.
Args:
userId: The ID of the user
authority: The authentication authority (e.g., MSFT, GOOGLE)
externalId: The external ID from the authority
externalUsername: The username from the authority
externalEmail: Optional email from the authority
status: The connection status (defaults to PENDING)
Returns:
The created UserConnection object
"""
try:
# Get the user
user = self.getUser(userId)
if not user:
raise ValueError(f"User not found: {userId}")
# Create new connection with all required fields
connection = UserConnection(
id=str(uuid.uuid4()),
userId=userId,
authority=authority,
externalId=externalId,
externalUsername=externalUsername,
externalEmail=externalEmail,
status=status,
connectedAt=get_utc_timestamp(),
lastChecked=get_utc_timestamp(),
expiresAt=None # Optional field, set to None by default
)
# Save to connections table
self.db.recordCreate(UserConnection, connection)
return connection
except Exception as e:
logger.error(f"Error adding user connection: {str(e)}")
raise ValueError(f"Failed to add user connection: {str(e)}")
def removeUserConnection(self, connectionId: str) -> None:
"""Remove a connection to an external service"""
try:
# Get connection
connections = self.db.getRecordset(UserConnection, recordFilter={
"id": connectionId
})
if not connections:
raise ValueError(f"Connection {connectionId} not found")
# Delete connection
self.db.recordDelete(UserConnection, connectionId)
except Exception as e:
logger.error(f"Error removing user connection: {str(e)}")
raise ValueError(f"Failed to remove user connection: {str(e)}")
def authenticateLocalUser(self, username: str, password: str) -> Optional[User]:
"""Authenticates a user by username and password using local authentication."""
# Clear the users table from cache and reload it
# Get user by username
user = self.getUserByUsername(username)
if not user:
raise ValueError("User not found")
# Check if the user is enabled
if not user.enabled:
raise ValueError("User is disabled")
# Verify that the user has local authentication enabled
if user.authenticationAuthority != AuthAuthority.LOCAL:
raise ValueError("User does not have local authentication enabled")
# Get the full user record with password hash for verification
userRecord = self.db.getRecordset(UserInDB, recordFilter={"id": user.id})[0]
if not userRecord.get("hashedPassword"):
raise ValueError("User has no password set")
if not self._verifyPassword(password, userRecord["hashedPassword"]):
raise ValueError("Invalid password")
return user
def createUser(self, username: str, password: str = None, email: str = None,
fullName: str = None, language: str = "en", enabled: bool = True,
privilege: UserPrivilege = UserPrivilege.USER,
authenticationAuthority: AuthAuthority = AuthAuthority.LOCAL,
externalId: str = None, externalUsername: str = None,
externalEmail: str = None) -> User:
"""Create a new user with optional external connection"""
try:
# Ensure username is a string
username = str(username).strip()
# Validate password for local authentication
if authenticationAuthority == AuthAuthority.LOCAL:
if not password:
raise ValueError("Password is required for local authentication")
if not isinstance(password, str):
raise ValueError("Password must be a string")
if not password.strip():
raise ValueError("Password cannot be empty")
# Create user data using UserInDB model
userData = UserInDB(
username=username,
email=email,
fullName=fullName,
language=language,
mandateId=self.mandateId,
enabled=enabled,
privilege=privilege,
authenticationAuthority=authenticationAuthority,
hashedPassword=self._getPasswordHash(password) if password else None,
connections=[]
)
# Create user record
createdRecord = self.db.recordCreate(UserInDB, userData)
if not createdRecord or not createdRecord.get("id"):
raise ValueError("Failed to create user record")
# Add external connection if provided
if externalId and externalUsername:
self.addUserConnection(
createdRecord["id"],
authenticationAuthority,
externalId,
externalUsername,
externalEmail
)
# Get created user using the returned ID
createdUser = self.db.getRecordset(UserInDB, recordFilter={"id": createdRecord["id"]})
if not createdUser or len(createdUser) == 0:
raise ValueError("Failed to retrieve created user")
# Clear cache to ensure fresh data (already done above)
return User.from_dict(createdUser[0])
except ValueError as e:
logger.error(f"Error creating user: {str(e)}")
raise
except Exception as e:
logger.error(f"Unexpected error creating user: {str(e)}")
raise ValueError(f"Failed to create user: {str(e)}")
def updateUser(self, userId: str, updateData: Dict[str, Any]) -> User:
"""Update a user's information"""
try:
# Get user
user = self.getUser(userId)
if not user:
raise ValueError(f"User {userId} not found")
# Update user data using model
updatedData = user.to_dict()
updatedData.update(updateData)
updatedUser = User.from_dict(updatedData)
# Update user record
self.db.recordModify(UserInDB, userId, updatedUser)
# Get updated user
updatedUser = self.getUser(userId)
if not updatedUser:
raise ValueError("Failed to retrieve updated user")
return updatedUser
except Exception as e:
logger.error(f"Error updating user: {str(e)}")
raise ValueError(f"Failed to update user: {str(e)}")
def disableUser(self, userId: str) -> User:
"""Disables a user if current user has permission."""
return self.updateUser(userId, {"enabled": False})
def enableUser(self, userId: str) -> User:
"""Enables a user if current user has permission."""
return self.updateUser(userId, {"enabled": True})
def _deleteUserReferencedData(self, userId: str) -> None:
"""Deletes all data associated with a user."""
try:
# Delete user auth events
events = self.db.getRecordset(AuthEvent, recordFilter={"userId": userId})
for event in events:
self.db.recordDelete(AuthEvent, event["id"])
# Delete user tokens
tokens = self.db.getRecordset(Token, recordFilter={"userId": userId})
for token in tokens:
self.db.recordDelete(Token, token["id"])
# Delete user connections
connections = self.db.getRecordset(UserConnection, recordFilter={"userId": userId})
for conn in connections:
self.db.recordDelete(UserConnection, conn["id"])
logger.info(f"All referenced data for user {userId} has been deleted")
except Exception as e:
logger.error(f"Error deleting referenced data for user {userId}: {str(e)}")
raise
def deleteUser(self, userId: str) -> bool:
"""Deletes a user if current user has permission."""
try:
# Get user
user = self.getUser(userId)
if not user:
raise ValueError(f"User {userId} not found")
if not self._canModify(UserInDB, userId):
raise PermissionError(f"No permission to delete user {userId}")
# Delete all referenced data first
self._deleteUserReferencedData(userId)
# Delete user record
success = self.db.recordDelete(UserInDB, userId)
if not success:
raise ValueError(f"Failed to delete user {userId}")
logger.info(f"User {userId} successfully deleted")
return True
except Exception as e:
logger.error(f"Error deleting user: {str(e)}")
raise ValueError(f"Failed to delete user: {str(e)}")
# Mandate methods
def getAllMandates(self) -> List[Mandate]:
"""Returns all mandates based on user access level."""
allMandates = self.db.getRecordset(Mandate)
filteredMandates = self._uam(Mandate, allMandates)
return [Mandate.from_dict(mandate) for mandate in filteredMandates]
def getMandate(self, mandateId: str) -> Optional[Mandate]:
"""Returns a mandate by ID if user has access."""
mandates = self.db.getRecordset(Mandate, recordFilter={"id": mandateId})
if not mandates:
return None
filteredMandates = self._uam(Mandate, mandates)
if not filteredMandates:
return None
return Mandate.from_dict(filteredMandates[0])
def createMandate(self, name: str, language: str = "en") -> Mandate:
"""Creates a new mandate if user has permission."""
if not self._canModify(Mandate):
raise PermissionError("No permission to create mandates")
# Create mandate data using model
mandateData = Mandate(
name=name,
language=language
)
# Create mandate record
createdRecord = self.db.recordCreate(Mandate, mandateData)
if not createdRecord or not createdRecord.get("id"):
raise ValueError("Failed to create mandate record")
return Mandate.from_dict(createdRecord)
def updateMandate(self, mandateId: str, updateData: Dict[str, Any]) -> Mandate:
"""Updates a mandate if user has access."""
try:
# First check if user has permission to modify mandates
if not self._canModify(Mandate, mandateId):
raise PermissionError(f"No permission to update mandate {mandateId}")
# Get mandate with access control
mandate = self.getMandate(mandateId)
if not mandate:
raise ValueError(f"Mandate {mandateId} not found")
# Update mandate data using model
updatedData = mandate.to_dict()
updatedData.update(updateData)
updatedMandate = Mandate.from_dict(updatedData)
# Update mandate record
self.db.recordModify(Mandate, mandateId, updatedMandate)
# Clear cache to ensure fresh data
# Get updated mandate
updatedMandate = self.getMandate(mandateId)
if not updatedMandate:
raise ValueError("Failed to retrieve updated mandate")
return updatedMandate
except Exception as e:
logger.error(f"Error updating mandate: {str(e)}")
raise ValueError(f"Failed to update mandate: {str(e)}")
def deleteMandate(self, mandateId: str) -> bool:
"""Deletes a mandate if user has access."""
try:
# Check if mandate exists and user has access
mandate = self.getMandate(mandateId)
if not mandate:
return False
if not self._canModify(Mandate, mandateId):
raise PermissionError(f"No permission to delete mandate {mandateId}")
# Check if mandate has users
users = self.getUsersByMandate(mandateId)
if users:
raise ValueError(f"Cannot delete mandate {mandateId} with existing users")
# Delete mandate
success = self.db.recordDelete(Mandate, mandateId)
# Clear cache to ensure fresh data
return success
except Exception as e:
logger.error(f"Error deleting mandate: {str(e)}")
raise ValueError(f"Failed to delete mandate: {str(e)}")
def _getInitialUser(self) -> Optional[Dict[str, Any]]:
"""Get the initial user record directly from database without access control."""
try:
initialUserId = self.getInitialId(UserInDB)
if not initialUserId:
return None
users = self.db.getRecordset(UserInDB, recordFilter={"id": initialUserId})
return users[0] if users else None
except Exception as e:
logger.error(f"Error getting initial user: {str(e)}")
return None
def checkUsernameAvailability(self, checkData: Dict[str, Any]) -> Dict[str, Any]:
"""Checks if a username is available for registration."""
try:
username = checkData.get("username")
authenticationAuthority = checkData.get("authenticationAuthority", "local")
if not username:
return {
"available": False,
"message": "Username is required"
}
# Get user by username
user = self.getUserByUsername(username)
# Check if user exists (User model instance)
if user is not None:
return {
"available": False,
"message": "Username is already taken"
}
return {
"available": True,
"message": "Username is available"
}
except Exception as e:
logger.error(f"Error checking username availability: {str(e)}")
return {
"available": False,
"message": f"Error checking username availability: {str(e)}"
}
def saveAccessToken(self, token: Token, replace_existing: bool = True) -> None:
"""Save an access token for the current user (must NOT have connectionId)"""
try:
# Validate that this is NOT a connection token
if token.connectionId:
raise ValueError("Access tokens cannot have connectionId - use saveConnectionToken instead")
# Validate user context
if not self.currentUser or not self.currentUser.id:
raise ValueError("No valid user context available for token storage")
# Set the user ID and mandate ID
token.userId = self.currentUser.id
# Ensure token has required fields
if not token.id:
token.id = str(uuid.uuid4())
if not token.createdAt:
token.createdAt = get_utc_timestamp()
# If replace_existing is True, delete old access tokens for this user and authority first
if replace_existing:
try:
old_tokens = self.db.getRecordset(Token, recordFilter={
"userId": self.currentUser.id,
"authority": token.authority,
"connectionId": None # Ensure we only delete access tokens
})
deleted_count = 0
for old_token in old_tokens:
if old_token["id"] != token.id: # Don't delete the new token if it already exists
self.db.recordDelete(Token, old_token["id"])
deleted_count += 1
if deleted_count > 0:
logger.info(f"Replaced {deleted_count} old access tokens for user {self.currentUser.id} and authority {token.authority}")
except Exception as e:
logger.warning(f"Failed to delete old access tokens for user {self.currentUser.id} and authority {token.authority}: {str(e)}")
# Continue with saving the new token even if deletion fails
# Convert to dict and ensure all fields are properly set
token_dict = token.dict()
# Ensure userId is set to current user
token_dict["userId"] = self.currentUser.id
# Save to database
self.db.recordCreate(Token, token_dict)
except Exception as e:
logger.error(f"Error saving access token: {str(e)}")
raise
def saveConnectionToken(self, token: Token, replace_existing: bool = True) -> None:
"""Save a connection token (must have connectionId)"""
try:
# Validate that this IS a connection token
if not token.connectionId:
raise ValueError("Connection tokens must have connectionId - use saveAccessToken instead")
# Validate user context
if not self.currentUser or not self.currentUser.id:
raise ValueError("No valid user context available for token storage")
# Set the user ID for the connection token
token.userId = self.currentUser.id
# Ensure token has required fields
if not token.id:
token.id = str(uuid.uuid4())
if not token.createdAt:
token.createdAt = get_utc_timestamp()
# If replace_existing is True, delete old tokens for this connectionId first
if replace_existing:
try:
old_tokens = self.db.getRecordset(Token, recordFilter={
"connectionId": token.connectionId
})
deleted_count = 0
for old_token in old_tokens:
if old_token["id"] != token.id: # Don't delete the new token if it already exists
self.db.recordDelete(Token, old_token["id"])
deleted_count += 1
if deleted_count > 0:
logger.info(f"Replaced {deleted_count} old tokens for connectionId {token.connectionId}")
except Exception as e:
logger.warning(f"Failed to delete old tokens for connectionId {token.connectionId}: {str(e)}")
# Continue with saving the new token even if deletion fails
# Convert to dict and ensure all fields are properly set
token_dict = token.dict()
# Ensure userId is set to current user
token_dict["userId"] = self.currentUser.id
# Save to database
self.db.recordCreate(Token, token_dict)
except Exception as e:
logger.error(f"Error saving connection token: {str(e)}")
raise
def getAccessToken(self, authority: str, auto_refresh: bool = True) -> Optional[Token]:
"""Get the latest valid access token for the current user and authority, optionally auto-refresh if expired"""
try:
# Validate that we're not looking for connection tokens
if not self.currentUser or not self.currentUser.id:
raise ValueError("No valid user context available for token retrieval")
# Get access tokens for this user and authority (must NOT have connectionId)
tokens = self.db.getRecordset(Token, recordFilter={
"userId": self.currentUser.id,
"authority": authority,
"connectionId": None # Ensure we only get access tokens
})
if not tokens:
return None
# Sort by creation date and get the latest
tokens.sort(key=lambda x: x.get("createdAt", ""), reverse=True)
latest_token = Token(**tokens[0])
# Check if token is expired
if latest_token.expiresAt and latest_token.expiresAt < get_utc_timestamp():
if auto_refresh:
# Import TokenManager here to avoid circular imports
from modules.security.tokenManager import TokenManager
token_manager = TokenManager()
# Try to refresh the token
refreshed_token = token_manager.refresh_token(latest_token)
if refreshed_token:
# Save the new token (which will automatically replace old ones)
self.saveAccessToken(refreshed_token)
return refreshed_token
else:
logger.warning(f"Failed to refresh expired access token for {authority}")
return None
else:
logger.warning(f"Access token for {authority} is expired (expiresAt: {latest_token.expiresAt})")
return None
return latest_token
except Exception as e:
logger.error(f"Error getting access token: {str(e)}")
return None
def getConnectionToken(self, connectionId: str, auto_refresh: bool = True) -> Optional[Token]:
"""Get the connection token for a specific connectionId, optionally auto-refresh if expired"""
try:
# Validate connectionId
if not connectionId:
raise ValueError("connectionId is required for getConnectionToken")
# Get token for this specific connection
# Query for specific connection
tokens = self.db.getRecordset(Token, recordFilter={
"connectionId": connectionId
})
if not tokens:
logger.warning(f"No connection token found for connectionId: {connectionId}")
return None
# Sort by expiration date and get the latest (most recent expiration)
tokens.sort(key=lambda x: x.get("expiresAt", 0), reverse=True)
latest_token = Token(**tokens[0])
# Check if token is expired or expires within 30 minutes
current_time = get_utc_timestamp()
thirty_minutes = 30 * 60 # 30 minutes in seconds
if latest_token.expiresAt and latest_token.expiresAt < (current_time + thirty_minutes):
if auto_refresh:
# Import TokenManager here to avoid circular imports
from modules.security.tokenManager import TokenManager
token_manager = TokenManager()
# Try to refresh the token
refreshed_token = token_manager.refresh_token(latest_token)
if refreshed_token:
# Save the new token (which will automatically replace old ones)
self.saveConnectionToken(refreshed_token)
logger.info(f"Proactively refreshed connection token for connectionId {connectionId} (expired in {latest_token.expiresAt - current_time} seconds)")
return refreshed_token
else:
logger.warning(f"Token refresh failed for connectionId {connectionId}")
return None
else:
logger.warning(f"Connection token for connectionId {connectionId} expires soon (expiresAt: {latest_token.expiresAt})")
return None
return latest_token
except Exception as e:
logger.error(f"Error getting connection token for connectionId {connectionId}: {str(e)}")
return None
def deleteAccessToken(self, authority: str) -> None:
"""Delete all access tokens for the current user and authority"""
try:
# Validate user context
if not self.currentUser or not self.currentUser.id:
raise ValueError("No valid user context available for token deletion")
# Get access tokens to delete (must NOT have connectionId)
tokens = self.db.getRecordset(Token, recordFilter={
"userId": self.currentUser.id,
"authority": authority,
"connectionId": None # Ensure we only delete access tokens
})
# Delete each token
for token in tokens:
self.db.recordDelete(Token, token["id"])
except Exception as e:
logger.error(f"Error deleting access token: {str(e)}")
raise
def deleteConnectionTokenByConnectionId(self, connectionId: str) -> None:
"""Delete all connection tokens for a specific connectionId"""
try:
# Validate connectionId
if not connectionId:
raise ValueError("connectionId is required for deleteConnectionTokenByConnectionId")
# Get connection tokens to delete
tokens = self.db.getRecordset(Token, recordFilter={
"connectionId": connectionId
})
# Delete each token
for token in tokens:
self.db.recordDelete(Token, token["id"])
except Exception as e:
logger.error(f"Error deleting connection token for connectionId {connectionId}: {str(e)}")
raise
def cleanupExpiredTokens(self) -> int:
"""Clean up expired tokens for all connections, returns count of cleaned tokens"""
try:
from modules.shared.timezoneUtils import get_utc_timestamp
current_time = get_utc_timestamp()
cleaned_count = 0
# Get all tokens
all_tokens = self.db.getRecordset(Token, recordFilter={})
for token_data in all_tokens:
if token_data.get("expiresAt") and token_data.get("expiresAt") < current_time:
# Token is expired, delete it
self.db.recordDelete(Token, token_data["id"])
cleaned_count += 1
# Clear cache to ensure fresh data
if cleaned_count > 0:
logger.info(f"Cleaned up {cleaned_count} expired tokens")
return cleaned_count
except Exception as e:
logger.error(f"Error cleaning up expired tokens: {str(e)}")
return 0
def logout(self) -> None:
"""Logout current user - clear user context and tokens"""
try:
# Clear user context
self.currentUser = None
self.userId = None
self.mandateId = None
self.access = None
# Clear database context
if hasattr(self, 'db'):
self.db.updateContext("")
logger.info("User logged out successfully")
except Exception as e:
logger.error(f"Error during logout: {str(e)}")
raise
# Data Neutralization methods
def getNeutralizationConfig(self) -> Optional[DataNeutraliserConfig]:
"""Get the data neutralization configuration for the current user's mandate"""
try:
configs = self.db.getRecordset(DataNeutraliserConfig, recordFilter={"mandateId": self.mandateId})
if not configs:
return None
# Apply access control
filtered_configs = self._uam(DataNeutraliserConfig, configs)
if not filtered_configs:
return None
return DataNeutraliserConfig.from_dict(filtered_configs[0])
except Exception as e:
logger.error(f"Error getting neutralization config: {str(e)}")
return None
def createOrUpdateNeutralizationConfig(self, config_data: Dict[str, Any]) -> DataNeutraliserConfig:
"""Create or update the data neutralization configuration"""
try:
# Check if config already exists
existing_config = self.getNeutralizationConfig()
if existing_config:
# Update existing config
update_data = existing_config.to_dict()
update_data.update(config_data)
update_data["updatedAt"] = get_utc_timestamp()
updated_config = DataNeutraliserConfig.from_dict(update_data)
self.db.recordModify(DataNeutraliserConfig, existing_config.id, updated_config)
return updated_config
else:
# Create new config
config_data["mandateId"] = self.mandateId
config_data["userId"] = self.userId
new_config = DataNeutraliserConfig.from_dict(config_data)
created_record = self.db.recordCreate(DataNeutraliserConfig, new_config)
return DataNeutraliserConfig.from_dict(created_record)
except Exception as e:
logger.error(f"Error creating/updating neutralization config: {str(e)}")
raise ValueError(f"Failed to create/update neutralization config: {str(e)}")
def neutralizeText(self, text: str, file_id: Optional[str] = None) -> Dict[str, Any]:
"""Neutralize text content and store attribute mappings"""
try:
from modules.neutralizer.neutralizer import DataAnonymizer
# Get neutralization configuration to extract namesToParse
config = self.getNeutralizationConfig()
names_to_parse = []
if config and hasattr(config, 'namesToParse') and config.namesToParse:
# Split by newlines and filter out empty strings
names_to_parse = [name.strip() for name in config.namesToParse.split('\n') if name.strip()]
# Initialize anonymizer with custom names
anonymizer = DataAnonymizer(names_to_parse=names_to_parse)
# Process the text
result = anonymizer.process_content(text, 'text')
# Store attribute mappings in database
stored_attributes = []
for original_text, neutralized_text in result.mapping.items():
# Extract pattern type and UUID from the neutralized text format [type.uuid]
pattern_type = "unknown"
placeholder_uuid = None
if neutralized_text.startswith("[") and "." in neutralized_text and neutralized_text.endswith("]"):
# Extract type and UUID from [type.uuid] format
inner = neutralized_text[1:-1] # Remove [ and ]
if "." in inner:
pattern_type, placeholder_uuid = inner.split(".", 1)
# Check if this exact original text already has a placeholder in the database
existing_attribute = self.getExistingPlaceholder(original_text)
if existing_attribute:
# Reuse existing placeholder
existing_uuid = existing_attribute.id
existing_pattern_type = existing_attribute.patternType
# Update the neutralized text to use the existing UUID
result.data = result.data.replace(neutralized_text, f"[{existing_pattern_type}.{existing_uuid}]")
result.mapping[original_text] = f"[{existing_pattern_type}.{existing_uuid}]"
stored_attributes.append(existing_attribute)
else:
# Create new attribute record with the UUID that the neutralizer generated
attribute_data = {
"id": placeholder_uuid, # Use the UUID from the neutralizer
"mandateId": self.mandateId,
"userId": self.userId,
"originalText": original_text,
"fileId": file_id,
"patternType": pattern_type
}
attribute = DataNeutralizerAttributes.from_dict(attribute_data)
created_attribute = self.db.recordCreate(DataNeutralizerAttributes, attribute)
stored_attributes.append(created_attribute)
# The neutralized text is already in the correct [type.uuid] format
# No need to replace it, as it's already properly formatted
return {
"neutralized_text": result.data,
"attributes": stored_attributes,
"mapping": result.mapping,
"replaced_fields": result.replaced_fields,
"processed_info": result.processed_info
}
except Exception as e:
logger.error(f"Error neutralizing text: {str(e)}")
raise ValueError(f"Failed to neutralize text: {str(e)}")
def getExistingPlaceholder(self, original_text: str) -> Optional[DataNeutralizerAttributes]:
"""Get existing placeholder for original text if it exists"""
try:
existing_attributes = self.db.getRecordset(DataNeutralizerAttributes, recordFilter={
"mandateId": self.mandateId,
"userId": self.userId,
"originalText": original_text
})
if existing_attributes:
return DataNeutralizerAttributes.from_dict(existing_attributes[0])
return None
except Exception as e:
logger.error(f"Error getting existing placeholder: {str(e)}")
return None
def getNeutralizationAttributes(self, file_id: Optional[str] = None) -> List[DataNeutralizerAttributes]:
"""Get neutralization attributes, optionally filtered by file ID"""
try:
filter_dict = {"mandateId": self.mandateId}
if file_id:
filter_dict["fileId"] = file_id
attributes = self.db.getRecordset(DataNeutralizerAttributes, recordFilter=filter_dict)
filtered_attributes = self._uam(DataNeutralizerAttributes, attributes)
return [DataNeutralizerAttributes.from_dict(attr) for attr in filtered_attributes]
except Exception as e:
logger.error(f"Error getting neutralization attributes: {str(e)}")
return []
def resolveNeutralizedText(self, text: str) -> str:
"""Resolve UIDs in neutralized text back to original text"""
try:
# Find all placeholders in the new format [type.uuid]
placeholder_pattern = r'\[([a-z]+)\.([a-f0-9-]{36})\]'
matches = re.findall(placeholder_pattern, text)
resolved_text = text
for placeholder_type, uid in matches:
# Find the attribute with this UID (which is the record ID)
attributes = self.db.getRecordset(DataNeutralizerAttributes, recordFilter={
"mandateId": self.mandateId,
"id": uid
})
if attributes:
attribute = attributes[0]
# Replace placeholder with original text
placeholder = f"[{placeholder_type}.{uid}]"
resolved_text = resolved_text.replace(placeholder, attribute["originalText"])
else:
logger.warning(f"No attribute found for UID {uid}")
return resolved_text
except Exception as e:
logger.error(f"Error resolving neutralized text: {str(e)}")
return text
def deleteNeutralizationAttributes(self, file_id: str) -> bool:
"""Delete all neutralization attributes for a specific file"""
try:
attributes = self.db.getRecordset(DataNeutralizerAttributes, recordFilter={
"mandateId": self.mandateId,
"fileId": file_id
})
for attribute in attributes:
self.db.recordDelete(DataNeutralizerAttributes, attribute["id"])
logger.info(f"Deleted {len(attributes)} neutralization attributes for file {file_id}")
return True
except Exception as e:
logger.error(f"Error deleting neutralization attributes: {str(e)}")
return False
# Public Methods
def getInterface(currentUser: User) -> AppObjects:
"""
Returns a AppObjects instance for the current user.
Handles initialization of database and records.
"""
if not currentUser:
raise ValueError("Invalid user context: user is required")
# Create context key
contextKey = f"{currentUser.mandateId}_{currentUser.id}"
# Create new instance if not exists
if contextKey not in _gatewayInterfaces:
_gatewayInterfaces[contextKey] = AppObjects(currentUser)
return _gatewayInterfaces[contextKey]
def getRootUser() -> User:
"""
Returns the root user from the database.
This is the user with the initial ID in the users table.
"""
try:
# Create a temporary interface without user context
tempInterface = AppObjects()
# Get the initial user directly
initialUserId = tempInterface.getInitialId(UserInDB)
if not initialUserId:
raise ValueError("No initial user ID found in database")
users = tempInterface.db.getRecordset(UserInDB, recordFilter={"id": initialUserId})
if not users:
raise ValueError("Initial user not found in database")
# Convert to User model and return the model instance
user_data = users[0]
return User.parse_obj(user_data)
except Exception as e:
logger.error(f"Error getting root user: {str(e)}")
raise ValueError(f"Failed to get root user: {str(e)}")
def getRootInterface() -> AppObjects:
"""
Returns a AppObjects instance with root privileges.
This is used for initial setup and user creation.
"""
global _rootAppObjects
if _rootAppObjects is None:
rootUser = getRootUser()
_rootAppObjects = AppObjects(rootUser)
return _rootAppObjects