1039 lines
No EOL
42 KiB
Python
1039 lines
No EOL
42 KiB
Python
"""
|
|
Interface to Management database and AI Connectors.
|
|
Uses the JSON connector for data access with added language support.
|
|
"""
|
|
|
|
import os
|
|
import logging
|
|
from datetime import datetime, UTC
|
|
from typing import Dict, Any, List, Optional, Union
|
|
|
|
import hashlib
|
|
|
|
from modules.interfaces.interfaceComponentAccess import ComponentAccess
|
|
from modules.interfaces.interfaceComponentModel import (
|
|
FilePreview, Prompt, FileItem, FileData, VoiceSettings
|
|
)
|
|
from modules.interfaces.interfaceAppModel import User, Mandate
|
|
|
|
# DYNAMIC PART: Connectors to the Interface
|
|
from modules.connectors.connectorDbPostgre import DatabaseConnector
|
|
|
|
# Basic Configurations
|
|
from modules.shared.configuration import APP_CONFIG
|
|
from modules.shared.timezoneUtils import get_utc_timestamp
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Singleton factory for Management instances with AI service per context
|
|
_instancesManagement = {}
|
|
|
|
# Custom exceptions for file handling
|
|
class FileError(Exception):
|
|
"""Base class for file handling exceptions."""
|
|
pass
|
|
|
|
class FileNotFoundError(FileError):
|
|
"""Exception raised when a file is not found."""
|
|
pass
|
|
|
|
class FileStorageError(FileError):
|
|
"""Exception raised when there's an error storing a file."""
|
|
pass
|
|
|
|
class FilePermissionError(FileError):
|
|
"""Exception raised when there's a permission issue with a file."""
|
|
pass
|
|
|
|
class FileDeletionError(FileError):
|
|
"""Exception raised when there's an error deleting a file."""
|
|
pass
|
|
|
|
class ComponentObjects:
|
|
"""
|
|
Interface to Management database and AI Connectors.
|
|
Uses the JSON connector for data access with added language support.
|
|
"""
|
|
|
|
def __init__(self):
|
|
"""Initializes the Management Interface."""
|
|
# Initialize variables first
|
|
self.currentUser: Optional[User] = None
|
|
self.userId: Optional[str] = None
|
|
self.access: Optional[ComponentAccess] = None # Will be set when user context is provided
|
|
|
|
# Initialize database
|
|
self._initializeDatabase()
|
|
|
|
# Initialize standard records if needed
|
|
self._initRecords()
|
|
|
|
def setUserContext(self, currentUser: User):
|
|
"""Sets the user context for the interface."""
|
|
if not currentUser:
|
|
logger.info("Initializing interface without user context")
|
|
return
|
|
|
|
self.currentUser = currentUser # Store User object directly
|
|
self.userId = currentUser.id
|
|
|
|
if not self.userId:
|
|
raise ValueError("Invalid user context: id is required")
|
|
|
|
# Add language settings
|
|
self.userLanguage = currentUser.language # Default user language
|
|
|
|
# Initialize access control with user context
|
|
self.access = ComponentAccess(self.currentUser, self.db)
|
|
|
|
# Update database context
|
|
self.db.updateContext(self.userId)
|
|
|
|
def __del__(self):
|
|
"""Cleanup method to close database connection."""
|
|
if hasattr(self, 'db') and self.db is not None:
|
|
try:
|
|
self.db.close()
|
|
except Exception as e:
|
|
logger.error(f"Error closing database connection: {e}")
|
|
|
|
logger.debug(f"User context set: userId={self.userId}")
|
|
|
|
def _initializeDatabase(self):
|
|
"""Initializes the database connection directly."""
|
|
try:
|
|
# Get configuration values with defaults
|
|
dbHost = APP_CONFIG.get("DB_MANAGEMENT_HOST", "_no_config_default_data")
|
|
dbDatabase = APP_CONFIG.get("DB_MANAGEMENT_DATABASE", "management")
|
|
dbUser = APP_CONFIG.get("DB_MANAGEMENT_USER")
|
|
dbPassword = APP_CONFIG.get("DB_MANAGEMENT_PASSWORD_SECRET")
|
|
dbPort = int(APP_CONFIG.get("DB_MANAGEMENT_PORT"))
|
|
|
|
# Create database connector directly
|
|
self.db = DatabaseConnector(
|
|
dbHost=dbHost,
|
|
dbDatabase=dbDatabase,
|
|
dbUser=dbUser,
|
|
dbPassword=dbPassword,
|
|
dbPort=dbPort,
|
|
userId=self.userId if hasattr(self, 'userId') else None
|
|
)
|
|
|
|
# Initialize database system
|
|
self.db.initDbSystem()
|
|
|
|
logger.info("Database initialized successfully")
|
|
except Exception as e:
|
|
logger.error(f"Failed to initialize database: {str(e)}")
|
|
raise
|
|
|
|
def _initRecords(self):
|
|
"""Initializes standard records in the database if they don't exist."""
|
|
try:
|
|
# Initialize standard prompts
|
|
self._initializeStandardPrompts()
|
|
|
|
# Add other record initializations here
|
|
|
|
logger.info("Standard records initialized successfully")
|
|
except Exception as e:
|
|
logger.error(f"Failed to initialize standard records: {str(e)}")
|
|
# Don't raise the error, just log it
|
|
# This allows the interface to be created even if initialization fails
|
|
|
|
def _initializeStandardPrompts(self):
|
|
"""Initializes standard prompts if they don't exist yet."""
|
|
try:
|
|
# Check if any prompts exist
|
|
existingPrompts = self.db.getRecordset(Prompt)
|
|
if existingPrompts:
|
|
logger.info("Prompts already exist, skipping initialization")
|
|
return
|
|
|
|
# Get the root interface to access the initial mandate ID
|
|
from modules.interfaces.interfaceAppObjects import getRootInterface
|
|
rootInterface = getRootInterface()
|
|
|
|
# Get initial mandate ID through the root interface
|
|
mandateId = rootInterface.getInitialId(Mandate)
|
|
if not mandateId:
|
|
logger.error("No initial mandate ID found")
|
|
return
|
|
|
|
# Get root user for initialization
|
|
rootUser = rootInterface.getUserByUsername("admin")
|
|
if not rootUser:
|
|
logger.error("Root user not found for initialization")
|
|
return
|
|
|
|
# Store current user context if it exists
|
|
currentUser = self.currentUser
|
|
|
|
# Set user context to root user for initialization
|
|
self.setUserContext(rootUser)
|
|
|
|
# Define standard prompts
|
|
standardPrompts = [
|
|
Prompt(
|
|
name="Market Research",
|
|
content="Research the current market trends and developments in [TOPIC]. Collect information about leading companies, innovative products or services, and current challenges. Present the results in a structured overview with relevant data and sources.",
|
|
mandateId=mandateId
|
|
),
|
|
Prompt(
|
|
name="Data Analysis",
|
|
content="Analyze the attached dataset on [TOPIC] and identify the most important trends, patterns, and anomalies. Perform statistical calculations to support your findings. Present the results in a clearly structured analysis and draw relevant conclusions.",
|
|
mandateId=mandateId
|
|
),
|
|
Prompt(
|
|
name="Meeting Protocol",
|
|
content="Create a detailed protocol of our meeting on [TOPIC]. Capture all discussed points, decisions made, and agreed measures. Structure the protocol clearly with agenda items, participant list, and clear responsibilities for follow-up actions.",
|
|
mandateId=mandateId
|
|
),
|
|
Prompt(
|
|
name="UI/UX Design",
|
|
content="Develop a UI/UX design concept for [APPLICATION/WEBSITE]. Consider the target audience, main functions, and brand identity. Describe the visual design, navigation, interaction patterns, and information architecture. Explain how the design optimizes user-friendliness and user experience.",
|
|
mandateId=mandateId
|
|
),
|
|
Prompt(
|
|
name="Primzahlen",
|
|
content="Gib mir die ersten 1000 Primzahlen.",
|
|
mandateId=mandateId
|
|
),
|
|
Prompt(
|
|
name="E-Mail",
|
|
content="Bereite mir eine formelle E-Mail an peter.muster@domain.com vor, um meinen Termin von 10 Uhr auf Freitag zu scheiben.",
|
|
mandateId=mandateId
|
|
)
|
|
]
|
|
|
|
# Create prompts
|
|
for prompt in standardPrompts:
|
|
self.db.recordCreate(Prompt, prompt)
|
|
logger.info(f"Created standard prompt: {prompt.name}")
|
|
|
|
# Restore original user context if it existed
|
|
if currentUser:
|
|
self.setUserContext(currentUser)
|
|
else:
|
|
self.currentUser = None
|
|
self.userId = None
|
|
self.access = None
|
|
self.db.updateContext("") # Reset database context
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error initializing standard prompts: {str(e)}")
|
|
# Ensure we restore user context even if there's an error
|
|
if 'currentUser' in locals() and currentUser:
|
|
self.setUserContext(currentUser)
|
|
else:
|
|
self.currentUser = None
|
|
self.userId = None
|
|
self.access = None
|
|
self.db.updateContext("") # Reset database context
|
|
|
|
def _uam(self, model_class: type, recordset: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
|
"""Delegate to access control module."""
|
|
# First apply access control
|
|
filteredRecords = self.access.uam(model_class, recordset)
|
|
|
|
# Then filter out database-specific fields
|
|
cleanedRecords = []
|
|
for record in filteredRecords:
|
|
# Create a new dict with only non-database fields
|
|
cleanedRecord = {k: v for k, v in record.items() if not k.startswith('_')}
|
|
cleanedRecords.append(cleanedRecord)
|
|
|
|
return cleanedRecords
|
|
|
|
def _canModify(self, model_class: type, recordId: Optional[str] = None) -> bool:
|
|
"""Delegate to access control module."""
|
|
return self.access.canModify(model_class, recordId)
|
|
|
|
|
|
# Utilities
|
|
|
|
def getInitialId(self, model_class: type) -> Optional[str]:
|
|
"""Returns the initial ID for a table."""
|
|
return self.db.getInitialId(model_class)
|
|
|
|
|
|
|
|
# Prompt methods
|
|
|
|
def getAllPrompts(self) -> List[Prompt]:
|
|
"""Returns prompts based on user access level."""
|
|
try:
|
|
allPrompts = self.db.getRecordset(Prompt)
|
|
filteredPrompts = self._uam(Prompt, allPrompts)
|
|
|
|
# Convert to Prompt objects
|
|
return [Prompt.from_dict(prompt) for prompt in filteredPrompts]
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting prompts: {str(e)}")
|
|
return []
|
|
|
|
def getPrompt(self, promptId: str) -> Optional[Prompt]:
|
|
"""Returns a prompt by ID if user has access."""
|
|
prompts = self.db.getRecordset(Prompt, recordFilter={"id": promptId})
|
|
if not prompts:
|
|
return None
|
|
|
|
filteredPrompts = self._uam(Prompt, prompts)
|
|
return Prompt.from_dict(filteredPrompts[0]) if filteredPrompts else None
|
|
|
|
def createPrompt(self, promptData: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Creates a new prompt if user has permission."""
|
|
if not self._canModify(Prompt):
|
|
raise PermissionError("No permission to create prompts")
|
|
|
|
# Create prompt record
|
|
createdRecord = self.db.recordCreate(Prompt, promptData)
|
|
if not createdRecord or not createdRecord.get("id"):
|
|
raise ValueError("Failed to create prompt record")
|
|
|
|
return createdRecord
|
|
|
|
def updatePrompt(self, promptId: str, updateData: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Updates a prompt if user has access."""
|
|
try:
|
|
# Get prompt
|
|
prompt = self.getPrompt(promptId)
|
|
if not prompt:
|
|
raise ValueError(f"Prompt {promptId} not found")
|
|
|
|
# Update prompt record directly with the update data
|
|
self.db.recordModify(Prompt, promptId, updateData)
|
|
|
|
# Clear cache to ensure fresh data
|
|
|
|
# Get updated prompt
|
|
updatedPrompt = self.getPrompt(promptId)
|
|
if not updatedPrompt:
|
|
raise ValueError("Failed to retrieve updated prompt")
|
|
|
|
return updatedPrompt.to_dict()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error updating prompt: {str(e)}")
|
|
raise ValueError(f"Failed to update prompt: {str(e)}")
|
|
|
|
def deletePrompt(self, promptId: str) -> bool:
|
|
"""Deletes a prompt if user has access."""
|
|
# Check if the prompt exists and user has access
|
|
prompt = self.getPrompt(promptId)
|
|
if not prompt:
|
|
return False
|
|
|
|
if not self._canModify(Prompt, promptId):
|
|
raise PermissionError(f"No permission to delete prompt {promptId}")
|
|
|
|
# Delete prompt
|
|
success = self.db.recordDelete(Prompt, promptId)
|
|
|
|
|
|
return success
|
|
|
|
# File Utilities
|
|
|
|
def checkForDuplicateFile(self, fileHash: str, fileName: str = None) -> Optional[FileItem]:
|
|
"""Checks if a file with the same hash already exists for the current user and mandate.
|
|
If fileName is provided, also checks for exact name+hash match.
|
|
Only returns files the current user has access to."""
|
|
# First get all files with the hash
|
|
allFilesWithHash = self.db.getRecordset(FileItem, recordFilter={
|
|
"fileHash": fileHash
|
|
})
|
|
|
|
# Filter by user access using UAM
|
|
accessibleFiles = self._uam(FileItem, allFilesWithHash)
|
|
|
|
if not accessibleFiles:
|
|
return None
|
|
|
|
# If fileName is provided, check for exact name+hash match first
|
|
if fileName:
|
|
for file in accessibleFiles:
|
|
# Skip files without fileName key or with None/empty fileName
|
|
if "fileName" not in file or not file["fileName"]:
|
|
continue
|
|
if file["fileName"] == fileName:
|
|
return FileItem(
|
|
id=file["id"],
|
|
mandateId=file["mandateId"],
|
|
fileName=file["fileName"],
|
|
mimeType=file["mimeType"],
|
|
fileHash=file["fileHash"],
|
|
fileSize=file["fileSize"],
|
|
creationDate=file["creationDate"]
|
|
)
|
|
|
|
# Return first valid file with matching hash (for general duplicate detection)
|
|
for file in accessibleFiles:
|
|
# Skip files without fileName key or with None/empty fileName
|
|
if "fileName" not in file or not file["fileName"]:
|
|
continue
|
|
# Use first valid file
|
|
return FileItem(
|
|
id=file["id"],
|
|
mandateId=file["mandateId"],
|
|
fileName=file["fileName"],
|
|
mimeType=file["mimeType"],
|
|
fileHash=file["fileHash"],
|
|
fileSize=file["fileSize"],
|
|
creationDate=file["creationDate"]
|
|
)
|
|
|
|
# If no valid files found, return None
|
|
return None
|
|
|
|
def getMimeType(self, fileName: str) -> str:
|
|
"""Determines the MIME type based on the file extension."""
|
|
import os
|
|
ext = os.path.splitext(fileName)[1].lower()[1:]
|
|
extensionToMime = {
|
|
"pdf": "application/pdf",
|
|
"docx": "application/vnd.openxmlformats-officedocument.wordprocessingml.document",
|
|
"doc": "application/msword",
|
|
"xlsx": "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
|
|
"xls": "application/vnd.ms-excel",
|
|
"pptx": "application/vnd.openxmlformats-officedocument.presentationml.presentation",
|
|
"ppt": "application/vnd.ms-powerpoint",
|
|
"csv": "text/csv",
|
|
"txt": "text/plain",
|
|
"json": "application/json",
|
|
"xml": "application/xml",
|
|
"html": "text/html",
|
|
"htm": "text/html",
|
|
"jpg": "image/jpeg",
|
|
"jpeg": "image/jpeg",
|
|
"png": "image/png",
|
|
"gif": "image/gif",
|
|
"webp": "image/webp",
|
|
"svg": "image/svg+xml",
|
|
"py": "text/x-python",
|
|
"js": "application/javascript",
|
|
"css": "text/css"
|
|
}
|
|
return extensionToMime.get(ext.lower(), "application/octet-stream")
|
|
|
|
def isTextMimeType(self, mimeType: str) -> bool:
|
|
"""Determines if a MIME type represents a text-based format."""
|
|
textMimeTypes = {
|
|
'text/plain',
|
|
'text/html',
|
|
'text/css',
|
|
'text/javascript',
|
|
'text/x-python',
|
|
'text/csv',
|
|
'text/xml',
|
|
'application/json',
|
|
'application/xml',
|
|
'application/javascript',
|
|
'application/x-python',
|
|
'application/x-httpd-php',
|
|
'application/x-sh',
|
|
'application/x-shellscript',
|
|
'application/x-yaml',
|
|
'application/x-toml',
|
|
'application/x-markdown',
|
|
'application/x-latex',
|
|
'application/x-tex',
|
|
'application/x-rst',
|
|
'application/x-asciidoc',
|
|
'application/x-markdown',
|
|
'application/x-httpd-php',
|
|
'application/x-httpd-php-source',
|
|
'application/x-httpd-php3',
|
|
'application/x-httpd-php4',
|
|
'application/x-httpd-php5',
|
|
'application/x-httpd-php7',
|
|
'application/x-httpd-php8',
|
|
'application/x-httpd-php-source',
|
|
'application/x-httpd-php3-source',
|
|
'application/x-httpd-php4-source',
|
|
'application/x-httpd-php5-source',
|
|
'application/x-httpd-php7-source',
|
|
'application/x-httpd-php8-source'
|
|
}
|
|
return mimeType.lower() in textMimeTypes
|
|
|
|
# File methods - metadata-based operations
|
|
|
|
def getAllFiles(self) -> List[FileItem]:
|
|
"""Returns files based on user access level."""
|
|
allFiles = self.db.getRecordset(FileItem)
|
|
filteredFiles = self._uam(FileItem, allFiles)
|
|
|
|
# Convert database records to FileItem instances
|
|
fileItems = []
|
|
for file in filteredFiles:
|
|
try:
|
|
# Ensure proper values, use defaults for invalid data
|
|
creationDate = file.get("creationDate")
|
|
if creationDate is None or not isinstance(creationDate, (int, float)) or creationDate <= 0:
|
|
creationDate = get_utc_timestamp()
|
|
|
|
fileName = file.get("fileName")
|
|
if not fileName or fileName == "None":
|
|
continue # Skip records with invalid fileName
|
|
|
|
fileItem = FileItem(
|
|
id=file.get("id"),
|
|
mandateId=file.get("mandateId"),
|
|
fileName=fileName,
|
|
mimeType=file.get("mimeType"),
|
|
fileHash=file.get("fileHash"),
|
|
fileSize=file.get("fileSize"),
|
|
creationDate=creationDate
|
|
)
|
|
fileItems.append(fileItem)
|
|
except Exception as e:
|
|
logger.warning(f"Skipping invalid file record: {str(e)}")
|
|
continue
|
|
|
|
return fileItems
|
|
|
|
def getFile(self, fileId: str) -> Optional[FileItem]:
|
|
"""Returns a file by ID if user has access."""
|
|
files = self.db.getRecordset(FileItem, recordFilter={"id": fileId})
|
|
if not files:
|
|
return None
|
|
|
|
filteredFiles = self._uam(FileItem, files)
|
|
if not filteredFiles:
|
|
return None
|
|
|
|
file = filteredFiles[0]
|
|
try:
|
|
# Get creation date from record or use current time
|
|
creationDate = file.get("creationDate")
|
|
if not creationDate:
|
|
creationDate = get_utc_timestamp()
|
|
|
|
return FileItem(
|
|
id=file.get("id"),
|
|
mandateId=file.get("mandateId"),
|
|
fileName=file.get("fileName"),
|
|
mimeType=file.get("mimeType"),
|
|
workflowId=file.get("workflowId"),
|
|
fileHash=file.get("fileHash"),
|
|
fileSize=file.get("fileSize"),
|
|
creationDate=creationDate
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Error converting file record: {str(e)}")
|
|
return None
|
|
|
|
def _isfileNameUnique(self, fileName: str, excludeFileId: Optional[str] = None) -> bool:
|
|
"""Checks if a fileName is unique for the current user."""
|
|
# Get all files for current user
|
|
files = self.db.getRecordset(FileItem, recordFilter={
|
|
"_createdBy": self.currentUser.id
|
|
})
|
|
|
|
# Check if fileName exists (excluding the current file if updating)
|
|
for file in files:
|
|
# Skip files without fileName key or with None/empty fileName
|
|
if "fileName" not in file or not file["fileName"]:
|
|
continue
|
|
if file["fileName"] == fileName and (excludeFileId is None or file["id"] != excludeFileId):
|
|
return False
|
|
return True
|
|
|
|
def _generateUniquefileName(self, fileName: str, excludeFileId: Optional[str] = None) -> str:
|
|
"""Generates a unique fileName by adding a number if necessary."""
|
|
if self._isfileNameUnique(fileName, excludeFileId):
|
|
return fileName
|
|
|
|
# Split fileName into name and extension
|
|
name, ext = os.path.splitext(fileName)
|
|
counter = 1
|
|
|
|
# Try fileNames with increasing numbers until we find a unique one
|
|
while True:
|
|
newfileName = f"{name}_{counter}{ext}"
|
|
if self._isfileNameUnique(newfileName, excludeFileId):
|
|
return newfileName
|
|
counter += 1
|
|
|
|
def createFile(self, name: str, mimeType: str, content: bytes) -> FileItem:
|
|
"""Creates a new file entry if user has permission. Computes fileHash and fileSize from content."""
|
|
import hashlib
|
|
if not self._canModify(FileItem):
|
|
raise PermissionError("No permission to create files")
|
|
|
|
# Ensure fileName is unique
|
|
uniqueName = self._generateUniquefileName(name)
|
|
|
|
# Compute file size and hash
|
|
fileSize = len(content)
|
|
fileHash = hashlib.sha256(content).hexdigest()
|
|
|
|
# Ensure mandateId is valid
|
|
mandateId = self.currentUser.mandateId or "default"
|
|
|
|
# Create FileItem instance
|
|
fileItem = FileItem(
|
|
mandateId=mandateId,
|
|
fileName=uniqueName,
|
|
mimeType=mimeType,
|
|
fileSize=fileSize,
|
|
fileHash=fileHash
|
|
)
|
|
|
|
# Store in database
|
|
self.db.recordCreate(FileItem, fileItem)
|
|
|
|
|
|
return fileItem
|
|
|
|
def updateFile(self, fileId: str, updateData: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Updates file metadata if user has access."""
|
|
# Check if the file exists and user has access
|
|
file = self.getFile(fileId)
|
|
if not file:
|
|
raise FileNotFoundError(f"File with ID {fileId} not found")
|
|
|
|
if not self._canModify(FileItem, fileId):
|
|
raise PermissionError(f"No permission to update file {fileId}")
|
|
|
|
# If fileName is being updated, ensure it's unique
|
|
if "fileName" in updateData:
|
|
updateData["fileName"] = self._generateUniquefileName(updateData["fileName"], fileId)
|
|
|
|
# Update file
|
|
success = self.db.recordModify(FileItem, fileId, updateData)
|
|
|
|
|
|
return success
|
|
|
|
def deleteFile(self, fileId: str) -> bool:
|
|
"""Deletes a file if user has access."""
|
|
try:
|
|
# Check if the file exists and user has access
|
|
file = self.getFile(fileId)
|
|
|
|
if not file:
|
|
raise FileNotFoundError(f"File with ID {fileId} not found")
|
|
|
|
if not self._canModify(FileItem, fileId):
|
|
raise PermissionError(f"No permission to delete file {fileId}")
|
|
|
|
# Check for other references to this file (by hash)
|
|
fileHash = file.fileHash
|
|
if fileHash:
|
|
otherReferences = [f for f in self.db.getRecordset(FileItem, recordFilter={"fileHash": fileHash})
|
|
if f["id"] != fileId]
|
|
|
|
# Only delete associated fileData if no other references exist
|
|
if not otherReferences:
|
|
try:
|
|
fileDataEntries = self.db.getRecordset(FileData, recordFilter={"id": fileId})
|
|
if fileDataEntries:
|
|
self.db.recordDelete(FileData, fileId)
|
|
logger.debug(f"FileData for file {fileId} deleted")
|
|
except Exception as e:
|
|
logger.warning(f"Error deleting FileData for file {fileId}: {str(e)}")
|
|
|
|
# Delete the FileItem entry
|
|
success = self.db.recordDelete(FileItem, fileId)
|
|
|
|
# Clear cache to ensure fresh data
|
|
|
|
return success
|
|
|
|
except FileNotFoundError as e:
|
|
raise
|
|
except FilePermissionError as e:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error deleting file {fileId}: {str(e)}")
|
|
raise FileDeletionError(f"Error deleting file: {str(e)}")
|
|
|
|
# FileData methods - data operations
|
|
|
|
def createFileData(self, fileId: str, data: bytes) -> bool:
|
|
"""Stores the binary data of a file in the database."""
|
|
try:
|
|
import base64
|
|
|
|
# Check file access
|
|
file = self.getFile(fileId)
|
|
if not file:
|
|
logger.error(f"File with ID {fileId} not found when storing data")
|
|
return False
|
|
|
|
# Determine if this is a text-based format
|
|
mimeType = file.mimeType
|
|
isTextFormat = self.isTextMimeType(mimeType)
|
|
|
|
base64Encoded = False
|
|
fileData = None
|
|
|
|
if isTextFormat:
|
|
# Try to decode as text
|
|
try:
|
|
textContent = data.decode('utf-8')
|
|
fileData = textContent
|
|
base64Encoded = False
|
|
logger.debug(f"Stored file {fileId} as text")
|
|
except UnicodeDecodeError:
|
|
# Fallback to base64 if text decoding fails
|
|
encodedData = base64.b64encode(data).decode('utf-8')
|
|
fileData = encodedData
|
|
base64Encoded = True
|
|
logger.warning(f"Failed to decode text file {fileId}, falling back to base64")
|
|
else:
|
|
# Binary format - always use base64
|
|
encodedData = base64.b64encode(data).decode('utf-8')
|
|
fileData = encodedData
|
|
base64Encoded = True
|
|
logger.debug(f"Stored file {fileId} as base64")
|
|
|
|
# Create the fileData record with data and encoding flag
|
|
fileDataObj = {
|
|
"id": fileId,
|
|
"data": fileData,
|
|
"base64Encoded": base64Encoded
|
|
}
|
|
|
|
self.db.recordCreate(FileData, fileDataObj)
|
|
|
|
# Clear cache to ensure fresh data
|
|
|
|
logger.debug(f"Successfully stored data for file {fileId} (base64Encoded: {base64Encoded})")
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Error storing data for file {fileId}: {str(e)}")
|
|
return False
|
|
|
|
def getFileData(self, fileId: str) -> Optional[bytes]:
|
|
"""Returns the binary data of a file if user has access."""
|
|
# Check file access
|
|
file = self.getFile(fileId)
|
|
if not file:
|
|
logger.warning(f"No access to file ID {fileId}")
|
|
return None
|
|
|
|
import base64
|
|
|
|
fileDataEntries = self.db.getRecordset(FileData, recordFilter={"id": fileId})
|
|
if not fileDataEntries:
|
|
logger.warning(f"No data found for file ID {fileId}")
|
|
return None
|
|
|
|
fileDataEntry = fileDataEntries[0]
|
|
if "data" not in fileDataEntry:
|
|
logger.warning(f"No data field in file data for ID {fileId}")
|
|
return None
|
|
|
|
data = fileDataEntry["data"]
|
|
base64Encoded = fileDataEntry.get("base64Encoded", False)
|
|
|
|
try:
|
|
if base64Encoded:
|
|
# Decode base64 to bytes
|
|
return base64.b64decode(data)
|
|
else:
|
|
# Check if this is supposed to be a binary file based on mime type
|
|
mimeType = file.mimeType
|
|
isTextFormat = self.isTextMimeType(mimeType)
|
|
|
|
if isTextFormat:
|
|
# This is a text file, encode to bytes as expected
|
|
return data.encode('utf-8')
|
|
else:
|
|
# This is a binary file that was incorrectly stored as text
|
|
# Try to decode it as if it was base64 (common fallback scenario)
|
|
try:
|
|
logger.warning(f"Binary file {fileId} ({mimeType}) was stored as text, attempting base64 decode")
|
|
return base64.b64decode(data)
|
|
except Exception as base64_error:
|
|
logger.error(f"Failed to decode binary file {fileId} as base64: {str(base64_error)}")
|
|
# Last resort: return the data as-is (might be corrupted)
|
|
logger.warning(f"Returning raw data for file {fileId} - file may be corrupted")
|
|
return data.encode('utf-8') if isinstance(data, str) else data
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing file data for {fileId}: {str(e)}")
|
|
return None
|
|
|
|
def getFileContent(self, fileId: str) -> Optional[FilePreview]:
|
|
"""Returns the full file content if user has access."""
|
|
try:
|
|
# Get file metadata
|
|
file = self.getFile(fileId)
|
|
if not file:
|
|
logger.warning(f"No access to file ID {fileId}")
|
|
return None
|
|
|
|
# Get file content
|
|
fileContent = self.getFileData(fileId)
|
|
if not fileContent:
|
|
logger.warning(f"No content found for file ID {fileId}")
|
|
return None
|
|
|
|
# Process content based on file type
|
|
isText = False
|
|
content = ""
|
|
encoding = None
|
|
|
|
# Use proper attribute access for FileItem object
|
|
if file.mimeType.startswith("text/"):
|
|
# For text files, return full content
|
|
try:
|
|
content = fileContent.decode('utf-8')
|
|
isText = True
|
|
encoding = 'utf-8'
|
|
except UnicodeDecodeError:
|
|
content = fileContent.decode('latin-1')
|
|
isText = True
|
|
encoding = 'latin-1'
|
|
elif file.mimeType.startswith("image/"):
|
|
# For images, return base64
|
|
import base64
|
|
content = base64.b64encode(fileContent).decode('utf-8')
|
|
isText = False
|
|
else:
|
|
# For other files, return as base64
|
|
import base64
|
|
content = base64.b64encode(fileContent).decode('utf-8')
|
|
isText = False
|
|
|
|
return FilePreview(
|
|
content=content,
|
|
mimeType=file.mimeType,
|
|
fileName=file.fileName,
|
|
isText=isText,
|
|
encoding=encoding,
|
|
size=file.fileSize
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting file content: {str(e)}")
|
|
return None
|
|
|
|
def saveUploadedFile(self, fileContent: bytes, fileName: str) -> tuple[FileItem, str]:
|
|
"""Saves an uploaded file if user has permission."""
|
|
try:
|
|
# Check file creation permission
|
|
if not self._canModify(FileItem):
|
|
raise PermissionError("No permission to upload files")
|
|
|
|
logger.debug(f"Starting upload process for file: {fileName}")
|
|
|
|
if not isinstance(fileContent, bytes):
|
|
logger.error(f"Invalid fileContent type: {type(fileContent)}")
|
|
raise ValueError(f"fileContent must be bytes, got {type(fileContent)}")
|
|
|
|
# Compute file hash first to check for duplicates
|
|
import hashlib
|
|
fileHash = hashlib.sha256(fileContent).hexdigest()
|
|
|
|
# Check for exact name+hash match first (same name + same content)
|
|
existingFile = self.checkForDuplicateFile(fileHash, fileName)
|
|
if existingFile:
|
|
logger.info(f"Exact duplicate detected: {fileName} with same hash. Returning existing file reference.")
|
|
return existingFile, "exact_duplicate"
|
|
|
|
# Check for hash-only match (same content, different name)
|
|
existingFileWithSameHash = self.checkForDuplicateFile(fileHash)
|
|
if existingFileWithSameHash:
|
|
logger.info(f"Content duplicate detected: {fileName} has same content as {existingFileWithSameHash.fileName}")
|
|
# Continue with upload - filename will be made unique if needed
|
|
|
|
# Determine MIME type
|
|
mimeType = self.getMimeType(fileName)
|
|
|
|
# Save metadata and file (hash/size computed inside createFile)
|
|
logger.debug(f"Saving file metadata to database for file: {fileName}")
|
|
fileItem = self.createFile(
|
|
name=fileName,
|
|
mimeType=mimeType,
|
|
content=fileContent
|
|
)
|
|
|
|
# Save binary data
|
|
logger.debug(f"Saving file content to database for file: {fileName}")
|
|
self.createFileData(fileItem.id, fileContent)
|
|
|
|
logger.debug(f"File upload process completed for: {fileName}")
|
|
|
|
# Check if filename was modified (indicating name conflict)
|
|
if fileItem.fileName != fileName:
|
|
return fileItem, "name_conflict"
|
|
else:
|
|
return fileItem, "new_file"
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in saveUploadedFile for {fileName}: {str(e)}", exc_info=True)
|
|
raise FileStorageError(f"Error saving file: {str(e)}")
|
|
|
|
# VoiceSettings methods
|
|
|
|
def getVoiceSettings(self, userId: Optional[str] = None) -> Optional[VoiceSettings]:
|
|
"""Returns voice settings for a user if user has access."""
|
|
try:
|
|
targetUserId = userId or self.userId
|
|
if not targetUserId:
|
|
logger.error("No user ID provided for voice settings")
|
|
return None
|
|
|
|
# Get voice settings for the user
|
|
settings = self.db.getRecordset(VoiceSettings, recordFilter={"userId": targetUserId})
|
|
if not settings:
|
|
logger.debug(f"No voice settings found for user {targetUserId}")
|
|
return None
|
|
|
|
# Apply access control
|
|
filteredSettings = self._uam(VoiceSettings, settings)
|
|
if not filteredSettings:
|
|
logger.warning(f"No access to voice settings for user {targetUserId}")
|
|
return None
|
|
|
|
# Ensure timestamps are set for validation
|
|
settings_data = filteredSettings[0]
|
|
if not settings_data.get("creationDate"):
|
|
from modules.shared.timezoneUtils import get_utc_timestamp
|
|
settings_data["creationDate"] = get_utc_timestamp()
|
|
if not settings_data.get("lastModified"):
|
|
from modules.shared.timezoneUtils import get_utc_timestamp
|
|
settings_data["lastModified"] = get_utc_timestamp()
|
|
|
|
return VoiceSettings.from_dict(settings_data)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting voice settings: {str(e)}")
|
|
return None
|
|
|
|
def createVoiceSettings(self, settingsData: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Creates voice settings for a user if user has permission."""
|
|
try:
|
|
if not self._canModify(VoiceSettings):
|
|
raise PermissionError("No permission to create voice settings")
|
|
|
|
# Ensure userId is set
|
|
if "userId" not in settingsData:
|
|
settingsData["userId"] = self.userId
|
|
|
|
# Ensure mandateId is set
|
|
if "mandateId" not in settingsData:
|
|
settingsData["mandateId"] = self.currentUser.mandateId if self.currentUser else "default"
|
|
|
|
# Check if settings already exist for this user
|
|
existingSettings = self.getVoiceSettings(settingsData["userId"])
|
|
if existingSettings:
|
|
raise ValueError(f"Voice settings already exist for user {settingsData['userId']}")
|
|
|
|
# Create voice settings record
|
|
createdRecord = self.db.recordCreate(VoiceSettings, settingsData)
|
|
if not createdRecord or not createdRecord.get("id"):
|
|
raise ValueError("Failed to create voice settings record")
|
|
|
|
logger.info(f"Created voice settings for user {settingsData['userId']}")
|
|
return createdRecord
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error creating voice settings: {str(e)}")
|
|
raise
|
|
|
|
def updateVoiceSettings(self, userId: str, updateData: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Updates voice settings for a user if user has access."""
|
|
try:
|
|
# Get existing settings
|
|
existingSettings = self.getVoiceSettings(userId)
|
|
if not existingSettings:
|
|
raise ValueError(f"Voice settings not found for user {userId}")
|
|
|
|
# Update lastModified timestamp
|
|
from modules.shared.timezoneUtils import get_utc_timestamp
|
|
updateData["lastModified"] = get_utc_timestamp()
|
|
|
|
# Update voice settings record
|
|
success = self.db.recordModify(VoiceSettings, existingSettings.id, updateData)
|
|
if not success:
|
|
raise ValueError("Failed to update voice settings record")
|
|
|
|
# Get updated settings
|
|
updatedSettings = self.getVoiceSettings(userId)
|
|
if not updatedSettings:
|
|
raise ValueError("Failed to retrieve updated voice settings")
|
|
|
|
logger.info(f"Updated voice settings for user {userId}")
|
|
return updatedSettings.to_dict()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error updating voice settings: {str(e)}")
|
|
raise
|
|
|
|
def deleteVoiceSettings(self, userId: str) -> bool:
|
|
"""Deletes voice settings for a user if user has access."""
|
|
try:
|
|
# Get existing settings
|
|
existingSettings = self.getVoiceSettings(userId)
|
|
if not existingSettings:
|
|
logger.warning(f"Voice settings not found for user {userId}")
|
|
return False
|
|
|
|
# Delete voice settings
|
|
success = self.db.recordDelete(VoiceSettings, existingSettings.id)
|
|
if success:
|
|
logger.info(f"Deleted voice settings for user {userId}")
|
|
else:
|
|
logger.error(f"Failed to delete voice settings for user {userId}")
|
|
|
|
return success
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error deleting voice settings: {str(e)}")
|
|
return False
|
|
|
|
def getOrCreateVoiceSettings(self, userId: Optional[str] = None) -> VoiceSettings:
|
|
"""Gets existing voice settings or creates default ones for a user."""
|
|
try:
|
|
targetUserId = userId or self.userId
|
|
if not targetUserId:
|
|
raise ValueError("No user ID provided for voice settings")
|
|
|
|
# Try to get existing settings
|
|
existingSettings = self.getVoiceSettings(targetUserId)
|
|
if existingSettings:
|
|
return existingSettings
|
|
|
|
# Create default settings
|
|
defaultSettings = {
|
|
"userId": targetUserId,
|
|
"mandateId": self.currentUser.mandateId if self.currentUser else "default",
|
|
"sttLanguage": "de-DE",
|
|
"ttsLanguage": "de-DE",
|
|
"ttsVoice": "de-DE-KatjaNeural",
|
|
"translationEnabled": True,
|
|
"targetLanguage": "en-US"
|
|
}
|
|
|
|
createdRecord = self.createVoiceSettings(defaultSettings)
|
|
return VoiceSettings.from_dict(createdRecord)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting or creating voice settings: {str(e)}")
|
|
raise
|
|
|
|
|
|
def getInterface(currentUser: Optional[User] = None) -> 'ComponentObjects':
|
|
"""
|
|
Returns a ComponentObjects instance.
|
|
If currentUser is provided, initializes with user context.
|
|
Otherwise, returns an instance with only database access.
|
|
"""
|
|
# Create new instance if not exists
|
|
if "default" not in _instancesManagement:
|
|
_instancesManagement["default"] = ComponentObjects()
|
|
|
|
interface = _instancesManagement["default"]
|
|
|
|
if currentUser:
|
|
interface.setUserContext(currentUser)
|
|
else:
|
|
logger.info("Returning interface without user context")
|
|
|
|
return interface |