gateway/modules/interfaces/interfaceComponentObjects.py
2025-07-08 13:55:17 +02:00

894 lines
No EOL
35 KiB
Python

"""
Interface to Management database and AI Connectors.
Uses the JSON connector for data access with added language support.
"""
import os
import logging
from datetime import datetime, UTC
from typing import Dict, Any, List, Optional, Union
import hashlib
from modules.interfaces.interfaceComponentAccess import ComponentAccess
from modules.interfaces.interfaceComponentModel import (
FilePreview, Prompt, FileItem, FileData
)
from modules.interfaces.interfaceAppModel import User
# DYNAMIC PART: Connectors to the Interface
from modules.connectors.connectorDbJson import DatabaseConnector
# Basic Configurations
from modules.shared.configuration import APP_CONFIG
logger = logging.getLogger(__name__)
# Singleton factory for Management instances with AI service per context
_instancesManagement = {}
# Custom exceptions for file handling
class FileError(Exception):
"""Base class for file handling exceptions."""
pass
class FileNotFoundError(FileError):
"""Exception raised when a file is not found."""
pass
class FileStorageError(FileError):
"""Exception raised when there's an error storing a file."""
pass
class FilePermissionError(FileError):
"""Exception raised when there's a permission issue with a file."""
pass
class FileDeletionError(FileError):
"""Exception raised when there's an error deleting a file."""
pass
class ComponentObjects:
"""
Interface to Management database and AI Connectors.
Uses the JSON connector for data access with added language support.
"""
def __init__(self):
"""Initializes the Management Interface."""
# Initialize variables first
self.currentUser: Optional[User] = None
self.userId: Optional[str] = None
self.access: Optional[ComponentAccess] = None # Will be set when user context is provided
# Initialize database
self._initializeDatabase()
# Initialize standard records if needed
self._initRecords()
def setUserContext(self, currentUser: User):
"""Sets the user context for the interface."""
if not currentUser:
logger.info("Initializing interface without user context")
return
self.currentUser = currentUser # Store User object directly
self.userId = currentUser.id
if not self.userId:
raise ValueError("Invalid user context: id is required")
# Add language settings
self.userLanguage = currentUser.language # Default user language
# Initialize access control with user context
self.access = ComponentAccess(self.currentUser, self.db)
# Update database context
self.db.updateContext(self.userId)
logger.debug(f"User context set: userId={self.userId}")
def _initializeDatabase(self):
"""Initializes the database connection."""
try:
# Get configuration values with defaults
dbHost = APP_CONFIG.get("DB_MANAGEMENT_HOST", "_no_config_default_data")
dbDatabase = APP_CONFIG.get("DB_MANAGEMENT_DATABASE", "management")
dbUser = APP_CONFIG.get("DB_MANAGEMENT_USER")
dbPassword = APP_CONFIG.get("DB_MANAGEMENT_PASSWORD_SECRET")
# Ensure the database directory exists
os.makedirs(dbHost, exist_ok=True)
self.db = DatabaseConnector(
dbHost=dbHost,
dbDatabase=dbDatabase,
dbUser=dbUser,
dbPassword=dbPassword
)
logger.info("Database initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize database: {str(e)}")
raise
def _initRecords(self):
"""Initializes standard records in the database if they don't exist."""
try:
# Initialize standard prompts
self._initializeStandardPrompts()
# Add other record initializations here
logger.info("Standard records initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize standard records: {str(e)}")
# Don't raise the error, just log it
# This allows the interface to be created even if initialization fails
def _initializeStandardPrompts(self):
"""Initializes standard prompts if they don't exist yet."""
try:
# Check if any prompts exist
existingPrompts = self.db.getRecordset("prompts")
if existingPrompts:
logger.info("Prompts already exist, skipping initialization")
return
# Get the root interface to access the initial mandate ID
from modules.interfaces.interfaceAppObjects import getRootInterface
rootInterface = getRootInterface()
# Get initial mandate ID through the root interface
mandateId = rootInterface.getInitialId("mandates")
if not mandateId:
logger.error("No initial mandate ID found")
return
# Get root user for initialization
rootUser = rootInterface.getUserByUsername("admin")
if not rootUser:
logger.error("Root user not found for initialization")
return
# Store current user context if it exists
currentUser = self.currentUser
# Set user context to root user for initialization
self.setUserContext(rootUser)
# Define standard prompts
standardPrompts = [
Prompt(
name="Market Research",
content="Research the current market trends and developments in [TOPIC]. Collect information about leading companies, innovative products or services, and current challenges. Present the results in a structured overview with relevant data and sources.",
mandateId=mandateId
),
Prompt(
name="Data Analysis",
content="Analyze the attached dataset on [TOPIC] and identify the most important trends, patterns, and anomalies. Perform statistical calculations to support your findings. Present the results in a clearly structured analysis and draw relevant conclusions.",
mandateId=mandateId
),
Prompt(
name="Meeting Protocol",
content="Create a detailed protocol of our meeting on [TOPIC]. Capture all discussed points, decisions made, and agreed measures. Structure the protocol clearly with agenda items, participant list, and clear responsibilities for follow-up actions.",
mandateId=mandateId
),
Prompt(
name="UI/UX Design",
content="Develop a UI/UX design concept for [APPLICATION/WEBSITE]. Consider the target audience, main functions, and brand identity. Describe the visual design, navigation, interaction patterns, and information architecture. Explain how the design optimizes user-friendliness and user experience.",
mandateId=mandateId
),
Prompt(
name="Primzahlen",
content="Gib mir die ersten 1000 Primzahlen.",
mandateId=mandateId
),
Prompt(
name="E-Mail",
content="Bereite mir eine formelle E-Mail an peter.muster@domain.com vor, um meinen Termin von 10 Uhr auf Freitag zu scheiben.",
mandateId=mandateId
)
]
# Create prompts
for prompt in standardPrompts:
self.db.recordCreate("prompts", prompt.to_dict())
logger.info(f"Created standard prompt: {prompt.name}")
# Restore original user context if it existed
if currentUser:
self.setUserContext(currentUser)
else:
self.currentUser = None
self.userId = None
self.access = None
self.db.updateContext("") # Reset database context
except Exception as e:
logger.error(f"Error initializing standard prompts: {str(e)}")
# Ensure we restore user context even if there's an error
if 'currentUser' in locals() and currentUser:
self.setUserContext(currentUser)
else:
self.currentUser = None
self.userId = None
self.access = None
self.db.updateContext("") # Reset database context
def _uam(self, table: str, recordset: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Delegate to access control module."""
# First apply access control
filteredRecords = self.access.uam(table, recordset)
# Then filter out database-specific fields
cleanedRecords = []
for record in filteredRecords:
# Create a new dict with only non-database fields
cleanedRecord = {k: v for k, v in record.items() if not k.startswith('_')}
cleanedRecords.append(cleanedRecord)
return cleanedRecords
def _canModify(self, table: str, recordId: Optional[str] = None) -> bool:
"""Delegate to access control module."""
return self.access.canModify(table, recordId)
# Utilities
def getInitialId(self, table: str) -> Optional[str]:
"""Returns the initial ID for a table."""
return self.db.getInitialId(table)
def _getCurrentTimestamp(self) -> str:
"""Returns the current timestamp in ISO format"""
return datetime.now().isoformat()
# Prompt methods
def getAllPrompts(self) -> List[Prompt]:
"""Returns prompts based on user access level."""
try:
allPrompts = self.db.getRecordset("prompts")
filteredPrompts = self._uam("prompts", allPrompts)
# Convert to Prompt objects
return [Prompt.from_dict(prompt) for prompt in filteredPrompts]
except Exception as e:
logger.error(f"Error getting prompts: {str(e)}")
return []
def getPrompt(self, promptId: str) -> Optional[Prompt]:
"""Returns a prompt by ID if user has access."""
prompts = self.db.getRecordset("prompts", recordFilter={"id": promptId})
if not prompts:
return None
filteredPrompts = self._uam("prompts", prompts)
return Prompt.from_dict(filteredPrompts[0]) if filteredPrompts else None
def createPrompt(self, promptData: Dict[str, Any]) -> Dict[str, Any]:
"""Creates a new prompt if user has permission."""
if not self._canModify("prompts"):
raise PermissionError("No permission to create prompts")
# Create prompt record
createdRecord = self.db.recordCreate("prompts", promptData)
if not createdRecord or not createdRecord.get("id"):
raise ValueError("Failed to create prompt record")
return createdRecord
def updatePrompt(self, promptId: str, updateData: Dict[str, Any]) -> Dict[str, Any]:
"""Updates a prompt if user has access."""
try:
# Get prompt
prompt = self.getPrompt(promptId)
if not prompt:
raise ValueError(f"Prompt {promptId} not found")
# Update prompt record directly with the update data
self.db.recordModify("prompts", promptId, updateData)
# Get updated prompt
updatedPrompt = self.getPrompt(promptId)
if not updatedPrompt:
raise ValueError("Failed to retrieve updated prompt")
return updatedPrompt.to_dict()
except Exception as e:
logger.error(f"Error updating prompt: {str(e)}")
raise ValueError(f"Failed to update prompt: {str(e)}")
def deletePrompt(self, promptId: str) -> bool:
"""Deletes a prompt if user has access."""
# Check if the prompt exists and user has access
prompt = self.getPrompt(promptId)
if not prompt:
return False
if not self._canModify("prompts", promptId):
raise PermissionError(f"No permission to delete prompt {promptId}")
return self.db.recordDelete("prompts", promptId)
# File Utilities
def calculateFileHash(self, fileContent: bytes) -> str:
"""Calculates a SHA-256 hash for the file content"""
return hashlib.sha256(fileContent).hexdigest()
def checkForDuplicateFile(self, fileHash: str) -> Optional[FileItem]:
"""Checks if a file with the same hash already exists for the current user and mandate."""
files = self.db.getRecordset("files", recordFilter={
"fileHash": fileHash,
"mandateId": self.currentUser.mandateId,
"_createdBy": self.currentUser.id
})
if files:
return FileItem(
id=files[0]["id"],
mandateId=files[0]["mandateId"],
filename=files[0]["filename"],
mimeType=files[0]["mimeType"],
workflowId=files[0]["workflowId"],
fileHash=files[0]["fileHash"],
fileSize=files[0]["fileSize"]
)
return None
def getMimeType(self, filename: str) -> str:
"""Determines the MIME type based on the file extension."""
import os
ext = os.path.splitext(filename)[1].lower()[1:]
extensionToMime = {
"pdf": "application/pdf",
"docx": "application/vnd.openxmlformats-officedocument.wordprocessingml.document",
"doc": "application/msword",
"xlsx": "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
"xls": "application/vnd.ms-excel",
"pptx": "application/vnd.openxmlformats-officedocument.presentationml.presentation",
"ppt": "application/vnd.ms-powerpoint",
"csv": "text/csv",
"txt": "text/plain",
"json": "application/json",
"xml": "application/xml",
"html": "text/html",
"htm": "text/html",
"jpg": "image/jpeg",
"jpeg": "image/jpeg",
"png": "image/png",
"gif": "image/gif",
"webp": "image/webp",
"svg": "image/svg+xml",
"py": "text/x-python",
"js": "application/javascript",
"css": "text/css"
}
return extensionToMime.get(ext.lower(), "application/octet-stream")
def isTextMimeType(self, mimeType: str) -> bool:
"""Determines if a MIME type represents a text-based format."""
textMimeTypes = {
'text/plain',
'text/html',
'text/css',
'text/javascript',
'text/x-python',
'text/csv',
'text/xml',
'application/json',
'application/xml',
'application/javascript',
'application/x-python',
'application/x-httpd-php',
'application/x-sh',
'application/x-shellscript',
'application/x-yaml',
'application/x-toml',
'application/x-markdown',
'application/x-latex',
'application/x-tex',
'application/x-rst',
'application/x-asciidoc',
'application/x-markdown',
'application/x-httpd-php',
'application/x-httpd-php-source',
'application/x-httpd-php3',
'application/x-httpd-php4',
'application/x-httpd-php5',
'application/x-httpd-php7',
'application/x-httpd-php8',
'application/x-httpd-php-source',
'application/x-httpd-php3-source',
'application/x-httpd-php4-source',
'application/x-httpd-php5-source',
'application/x-httpd-php7-source',
'application/x-httpd-php8-source'
}
return mimeType.lower() in textMimeTypes
# File methods - metadata-based operations
def getAllFiles(self) -> List[FileItem]:
"""Returns files based on user access level."""
allFiles = self.db.getRecordset("files")
filteredFiles = self._uam("files", allFiles)
# Convert database records to FileItem instances
fileItems = []
for file in filteredFiles:
try:
# Get creation date from record or use current time
creationDate = file.get("creationDate")
if not creationDate:
creationDate = datetime.now().isoformat()
fileItem = FileItem(
id=file.get("id"),
mandateId=file.get("mandateId"),
filename=file.get("filename"),
mimeType=file.get("mimeType"),
workflowId=file.get("workflowId"),
fileHash=file.get("fileHash"),
fileSize=file.get("fileSize"),
creationDate=creationDate
)
fileItems.append(fileItem)
except Exception as e:
logger.warning(f"Skipping invalid file record: {str(e)}")
continue
return fileItems
def getFile(self, fileId: str) -> Optional[FileItem]:
"""Returns a file by ID if user has access."""
files = self.db.getRecordset("files", recordFilter={"id": fileId})
if not files:
return None
filteredFiles = self._uam("files", files)
if not filteredFiles:
return None
file = filteredFiles[0]
try:
# Get creation date from record or use current time
creationDate = file.get("creationDate")
if not creationDate:
creationDate = datetime.now().isoformat()
return FileItem(
id=file.get("id"),
mandateId=file.get("mandateId"),
filename=file.get("filename"),
mimeType=file.get("mimeType"),
workflowId=file.get("workflowId"),
fileHash=file.get("fileHash"),
fileSize=file.get("fileSize"),
creationDate=creationDate
)
except Exception as e:
logger.error(f"Error converting file record: {str(e)}")
return None
def _isFilenameUnique(self, filename: str, excludeFileId: Optional[str] = None) -> bool:
"""Checks if a filename is unique for the current user."""
# Get all files for current user
files = self.db.getRecordset("files", recordFilter={
"_createdBy": self.currentUser.id
})
# Check if filename exists (excluding the current file if updating)
for file in files:
if file["filename"] == filename and (excludeFileId is None or file["id"] != excludeFileId):
return False
return True
def _generateUniqueFilename(self, filename: str, excludeFileId: Optional[str] = None) -> str:
"""Generates a unique filename by adding a number if necessary."""
if self._isFilenameUnique(filename, excludeFileId):
return filename
# Split filename into name and extension
name, ext = os.path.splitext(filename)
counter = 1
# Try filenames with increasing numbers until we find a unique one
while True:
newFilename = f"{name}_{counter}{ext}"
if self._isFilenameUnique(newFilename, excludeFileId):
return newFilename
counter += 1
def createFile(self, name: str, mimeType: str, content: bytes) -> FileItem:
"""Creates a new file entry if user has permission. Computes fileHash and fileSize from content."""
import hashlib
if not self._canModify("files"):
raise PermissionError("No permission to create files")
# Ensure filename is unique
uniqueName = self._generateUniqueFilename(name)
# Compute file size and hash
fileSize = len(content)
fileHash = hashlib.sha256(content).hexdigest()
# Create FileItem instance
fileItem = FileItem(
mandateId=self.currentUser.mandateId,
filename=uniqueName,
mimeType=mimeType,
fileSize=fileSize,
fileHash=fileHash
)
# Store in database
self.db.recordCreate("files", fileItem.to_dict())
return fileItem
def updateFile(self, fileId: str, updateData: Dict[str, Any]) -> Dict[str, Any]:
"""Updates file metadata if user has access."""
# Check if the file exists and user has access
file = self.getFile(fileId)
if not file:
raise FileNotFoundError(f"File with ID {fileId} not found")
if not self._canModify("files", fileId):
raise PermissionError(f"No permission to update file {fileId}")
# If filename is being updated, ensure it's unique
if "filename" in updateData:
updateData["filename"] = self._generateUniqueFilename(updateData["filename"], fileId)
# Update file
return self.db.recordModify("files", fileId, updateData)
def deleteFile(self, fileId: str) -> bool:
"""Deletes a file if user has access."""
try:
# Check if the file exists and user has access
file = self.getFile(fileId)
if not file:
raise FileNotFoundError(f"File with ID {fileId} not found")
if not self._canModify("files", fileId):
raise PermissionError(f"No permission to delete file {fileId}")
# Check for other references to this file (by hash)
fileHash = file.fileHash
if fileHash:
otherReferences = [f for f in self.db.getRecordset("files", recordFilter={"fileHash": fileHash})
if f["id"] != fileId]
# Only delete associated fileData if no other references exist
if not otherReferences:
try:
fileDataEntries = self.db.getRecordset("fileData", recordFilter={"id": fileId})
if fileDataEntries:
self.db.recordDelete("fileData", fileId)
logger.debug(f"FileData for file {fileId} deleted")
except Exception as e:
logger.warning(f"Error deleting FileData for file {fileId}: {str(e)}")
# Delete the FileItem entry
return self.db.recordDelete("files", fileId)
except FileNotFoundError as e:
raise
except FilePermissionError as e:
raise
except Exception as e:
logger.error(f"Error deleting file {fileId}: {str(e)}")
raise FileDeletionError(f"Error deleting file: {str(e)}")
# FileData methods - data operations
def createFileData(self, fileId: str, data: bytes) -> bool:
"""Stores the binary data of a file in the database."""
try:
import base64
# Check file access
file = self.getFile(fileId)
if not file:
logger.error(f"File with ID {fileId} not found when storing data")
return False
# Determine if this is a text-based format
mimeType = file.mimeType
isTextFormat = self.isTextMimeType(mimeType)
base64Encoded = False
fileData = None
if isTextFormat:
# Try to decode as text
try:
textContent = data.decode('utf-8')
fileData = textContent
base64Encoded = False
logger.debug(f"Stored file {fileId} as text")
except UnicodeDecodeError:
# Fallback to base64 if text decoding fails
encodedData = base64.b64encode(data).decode('utf-8')
fileData = encodedData
base64Encoded = True
logger.warning(f"Failed to decode text file {fileId}, falling back to base64")
else:
# Binary format - always use base64
encodedData = base64.b64encode(data).decode('utf-8')
fileData = encodedData
base64Encoded = True
logger.debug(f"Stored file {fileId} as base64")
# Create the fileData record with data and encoding flag
fileDataObj = {
"id": fileId,
"data": fileData,
"base64Encoded": base64Encoded
}
self.db.recordCreate("fileData", fileDataObj)
logger.debug(f"Successfully stored data for file {fileId} (base64Encoded: {base64Encoded})")
return True
except Exception as e:
logger.error(f"Error storing data for file {fileId}: {str(e)}")
return False
def getFileData(self, fileId: str) -> Optional[bytes]:
"""Returns the binary data of a file if user has access."""
# Check file access
file = self.getFile(fileId)
if not file:
logger.warning(f"No access to file ID {fileId}")
return None
import base64
fileDataEntries = self.db.getRecordset("fileData", recordFilter={"id": fileId})
if not fileDataEntries:
logger.warning(f"No data found for file ID {fileId}")
return None
fileDataEntry = fileDataEntries[0]
if "data" not in fileDataEntry:
logger.warning(f"No data field in file data for ID {fileId}")
return None
data = fileDataEntry["data"]
base64Encoded = fileDataEntry.get("base64Encoded", False)
try:
if base64Encoded:
# Decode base64 to bytes
return base64.b64decode(data)
else:
# Convert text to bytes
return data.encode('utf-8')
except Exception as e:
logger.error(f"Error processing file data for {fileId}: {str(e)}")
return None
def getFileContent(self, fileId: str) -> Optional[FilePreview]:
"""Returns the full file content if user has access."""
try:
# Get file metadata
file = self.getFile(fileId)
if not file:
logger.warning(f"No access to file ID {fileId}")
return None
# Get file content
fileContent = self.getFileData(fileId)
if not fileContent:
logger.warning(f"No content found for file ID {fileId}")
return None
# Process content based on file type
contentType = "binary"
content = ""
if file.get("mimeType", "").startswith("text/"):
# For text files, return full content
try:
content = fileContent.decode('utf-8')
contentType = "text"
except UnicodeDecodeError:
content = fileContent.decode('latin-1')
contentType = "text"
elif file.get("mimeType", "").startswith("image/"):
# For images, return base64
contentType = "base64"
content = f"data:{file['mimeType']};base64,{fileContent.hex()}"
else:
# For other files, return as base64
contentType = "base64"
content = f"data:{file['mimeType']};base64,{fileContent.hex()}"
return FilePreview(
id=fileId,
name=file.get("name", "Unknown"),
mimeType=file.get("mimeType", "application/octet-stream"),
size=file.get("size", 0),
content=content,
contentType=contentType,
metadata=file.get("metadata", {})
)
except Exception as e:
logger.error(f"Error getting file content: {str(e)}")
return None
def updateFileData(self, fileId: str, data: Union[bytes, str]) -> bool:
"""Updates file data if user has access."""
# Check file access
file = self.getFile(fileId)
if not file:
logger.error(f"File with ID {fileId} not found when updating data")
return False
if not self._canModify("files", fileId):
logger.error(f"No permission to update file data for {fileId}")
return False
try:
import base64
# Determine if this is a text-based format
mimeType = file.mimeType
isTextFormat = self.isTextMimeType(mimeType)
base64Encoded = False
fileData = None
# Convert input data to the right format
if isinstance(data, bytes):
if isTextFormat:
try:
# Try to convert bytes to text
fileData = data.decode('utf-8')
base64Encoded = False
except UnicodeDecodeError:
# Fallback to base64 if text decoding fails
fileData = base64.b64encode(data).decode('utf-8')
base64Encoded = True
else:
# Binary format - use base64
fileData = base64.b64encode(data).decode('utf-8')
base64Encoded = True
elif isinstance(data, str):
if isTextFormat:
# Text format - store as text
fileData = data
base64Encoded = False
else:
# Check if it's already base64 encoded
try:
# Try to decode as base64 to validate
base64.b64decode(data)
fileData = data
base64Encoded = True
except:
# Not valid base64, encode the string
fileData = base64.b64encode(data.encode('utf-8')).decode('utf-8')
base64Encoded = True
else:
# Convert to string first
stringData = str(data)
if isTextFormat:
fileData = stringData
base64Encoded = False
else:
fileData = base64.b64encode(stringData.encode('utf-8')).decode('utf-8')
base64Encoded = True
# Check if a record already exists
fileDataEntries = self.db.getRecordset("fileData", recordFilter={"id": fileId})
dataUpdate = {
"data": fileData,
"base64Encoded": base64Encoded
}
if fileDataEntries:
# Update the existing record
self.db.recordModify("fileData", fileId, dataUpdate)
logger.debug(f"Updated file data for file ID {fileId} (base64Encoded: {base64Encoded})")
else:
# Create a new record
dataUpdate["id"] = fileId
self.db.recordCreate("fileData", dataUpdate)
logger.debug(f"Created new file data for file ID {fileId} (base64Encoded: {base64Encoded})")
return True
except Exception as e:
logger.error(f"Error updating data for file {fileId}: {str(e)}")
return False
def saveUploadedFile(self, fileContent: bytes, fileName: str) -> FileItem:
"""Saves an uploaded file if user has permission."""
try:
# Check file creation permission
if not self._canModify("files"):
raise PermissionError("No permission to upload files")
logger.debug(f"Starting upload process for file: {fileName}")
if not isinstance(fileContent, bytes):
logger.error(f"Invalid fileContent type: {type(fileContent)}")
raise ValueError(f"fileContent must be bytes, got {type(fileContent)}")
# Determine MIME type
mimeType = self.getMimeType(fileName)
# Save metadata and file (hash/size computed inside createFile)
logger.debug(f"Saving file metadata to database for file: {fileName}")
fileItem = self.createFile(
name=fileName,
mimeType=mimeType,
content=fileContent
)
# Save binary data
logger.debug(f"Saving file content to database for file: {fileName}")
self.createFileData(fileItem.id, fileContent)
logger.debug(f"File upload process completed for: {fileName}")
return fileItem
except Exception as e:
logger.error(f"Error in saveUploadedFile for {fileName}: {str(e)}", exc_info=True)
raise FileStorageError(f"Error saving file: {str(e)}")
def downloadFile(self, fileId: str) -> Optional[Dict[str, Any]]:
"""Returns a file for download if user has access."""
try:
# Check file access
file = self.getFile(fileId)
if not file:
raise FileNotFoundError(f"File with ID {fileId} not found")
# Get binary data
fileContent = self.getFileData(fileId)
if fileContent is None:
raise FileNotFoundError(f"Binary data for file with ID {fileId} not found")
return {
"id": fileId,
"name": file.filename,
"contentType": file.mimeType,
"size": file.fileSize,
"content": fileContent
}
except FileNotFoundError as e:
raise
except Exception as e:
logger.error(f"Error downloading file {fileId}: {str(e)}")
raise FileError(f"Error downloading file: {str(e)}")
def getInterface(currentUser: Optional[User] = None) -> 'ComponentObjects':
"""
Returns a ComponentObjects instance.
If currentUser is provided, initializes with user context.
Otherwise, returns an instance with only database access.
"""
# Create new instance if not exists
if "default" not in _instancesManagement:
_instancesManagement["default"] = ComponentObjects()
interface = _instancesManagement["default"]
if currentUser:
interface.setUserContext(currentUser)
else:
logger.info("Returning interface without user context")
return interface