1208 lines
No EOL
52 KiB
Python
1208 lines
No EOL
52 KiB
Python
"""
|
|
Interface to LucyDOM database and AI Connectors.
|
|
Uses the JSON connector for data access with added language support.
|
|
"""
|
|
|
|
import os
|
|
import logging
|
|
import uuid
|
|
from datetime import datetime, UTC, timezone
|
|
from typing import Dict, Any, List, Optional, Union, get_origin, get_args
|
|
|
|
import asyncio
|
|
|
|
from modules.interfaces.interfaceDbChatAccess import ChatAccess
|
|
from modules.datamodels.datamodelChat import (
|
|
ActionItem,
|
|
TaskResult,
|
|
TaskItem,
|
|
TaskStatus,
|
|
ActionResult
|
|
)
|
|
from modules.datamodels.datamodelChat import (
|
|
UserInputRequest,
|
|
ChatDocument,
|
|
ChatStat,
|
|
ChatLog,
|
|
ChatMessage,
|
|
ChatWorkflow
|
|
)
|
|
from modules.datamodels.datamodelUam import User
|
|
|
|
# DYNAMIC PART: Connectors to the Interface
|
|
from modules.connectors.connectorDbPostgre import DatabaseConnector
|
|
from modules.shared.timezoneUtils import get_utc_timestamp
|
|
|
|
# Basic Configurations
|
|
from modules.shared.configuration import APP_CONFIG
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Singleton factory for Chat instances
|
|
_chatInterfaces = {}
|
|
|
|
class ChatObjects:
|
|
"""
|
|
Interface to Chat database and AI Connectors.
|
|
Uses the JSON connector for data access with added language support.
|
|
"""
|
|
|
|
def __init__(self, currentUser: Optional[User] = None):
|
|
"""Initializes the Chat Interface."""
|
|
# Initialize variables
|
|
self.currentUser = currentUser # Store User object directly
|
|
self.userId = currentUser.id if currentUser else None
|
|
self.mandateId = currentUser.mandateId if currentUser else None
|
|
self.access = None # Will be set when user context is provided
|
|
|
|
# Initialize services
|
|
self._initializeServices()
|
|
|
|
# Initialize database
|
|
self._initializeDatabase()
|
|
|
|
# Set user context if provided
|
|
if currentUser:
|
|
self.setUserContext(currentUser)
|
|
|
|
# ===== Generic Utility Methods =====
|
|
|
|
def _is_object_field(self, field_type) -> bool:
|
|
"""Check if a field type represents a complex object (not a simple type)."""
|
|
# Simple scalar types
|
|
if field_type in (str, int, float, bool, type(None)):
|
|
return False
|
|
|
|
# Everything else is an object
|
|
return True
|
|
|
|
def _separate_object_fields(self, model_class, data: Dict[str, Any]) -> tuple[Dict[str, Any], Dict[str, Any]]:
|
|
"""Separate simple fields from object fields based on Pydantic model structure."""
|
|
simple_fields = {}
|
|
object_fields = {}
|
|
|
|
# Get field information from the Pydantic model
|
|
model_fields = {}
|
|
if hasattr(model_class, '__fields__'):
|
|
model_fields = model_class.__fields__
|
|
|
|
for field_name, value in data.items():
|
|
# Check if this field should be stored as JSONB in the database
|
|
if field_name in model_fields:
|
|
field_info = model_fields[field_name]
|
|
field_type = field_info.type_
|
|
|
|
# Check if this is a JSONB field (Dict, List, or complex types)
|
|
if (field_type == dict or
|
|
field_type == list or
|
|
(hasattr(field_type, '__origin__') and field_type.__origin__ in (dict, list)) or
|
|
field_name in ['execParameters', 'expectedDocumentFormats', 'resultDocuments']):
|
|
# Store as JSONB - include in simple_fields for database storage
|
|
simple_fields[field_name] = value
|
|
elif isinstance(value, (str, int, float, bool, type(None))):
|
|
# Simple scalar types
|
|
simple_fields[field_name] = value
|
|
else:
|
|
# Complex objects that should be filtered out
|
|
object_fields[field_name] = value
|
|
else:
|
|
# Field not in model - treat as scalar if simple, otherwise filter out
|
|
if isinstance(value, (str, int, float, bool, type(None))):
|
|
simple_fields[field_name] = value
|
|
else:
|
|
object_fields[field_name] = value
|
|
|
|
return simple_fields, object_fields
|
|
|
|
def _initializeServices(self):
|
|
pass
|
|
|
|
def setUserContext(self, currentUser: User):
|
|
"""Sets the user context for the interface."""
|
|
self.currentUser = currentUser # Store User object directly
|
|
self.userId = currentUser.id
|
|
self.mandateId = currentUser.mandateId
|
|
|
|
if not self.userId or not self.mandateId:
|
|
raise ValueError("Invalid user context: id and mandateId are required")
|
|
|
|
# Add language settings
|
|
self.userLanguage = currentUser.language # Default user language
|
|
|
|
# Initialize access control with user context
|
|
self.access = ChatAccess(self.currentUser, self.db) # Convert to dict only when needed
|
|
|
|
# Update database context
|
|
self.db.updateContext(self.userId)
|
|
|
|
def __del__(self):
|
|
"""Cleanup method to close database connection."""
|
|
if hasattr(self, 'db') and self.db is not None:
|
|
try:
|
|
self.db.close()
|
|
except Exception as e:
|
|
logger.error(f"Error closing database connection: {e}")
|
|
|
|
|
|
def _initializeDatabase(self):
|
|
"""Initializes the database connection directly."""
|
|
try:
|
|
# Get configuration values with defaults
|
|
dbHost = APP_CONFIG.get("DB_CHAT_HOST", "_no_config_default_data")
|
|
dbDatabase = APP_CONFIG.get("DB_CHAT_DATABASE", "chat")
|
|
dbUser = APP_CONFIG.get("DB_CHAT_USER")
|
|
dbPassword = APP_CONFIG.get("DB_CHAT_PASSWORD_SECRET")
|
|
dbPort = int(APP_CONFIG.get("DB_CHAT_PORT", 5432))
|
|
|
|
# Create database connector directly
|
|
self.db = DatabaseConnector(
|
|
dbHost=dbHost,
|
|
dbDatabase=dbDatabase,
|
|
dbUser=dbUser,
|
|
dbPassword=dbPassword,
|
|
dbPort=dbPort,
|
|
userId=self.userId
|
|
)
|
|
|
|
# Initialize database system
|
|
self.db.initDbSystem()
|
|
|
|
logger.info("Database initialized successfully")
|
|
except Exception as e:
|
|
logger.error(f"Failed to initialize database: {str(e)}")
|
|
raise
|
|
|
|
def _initRecords(self):
|
|
"""Initializes standard records in the database if they don't exist."""
|
|
pass
|
|
|
|
def _uam(self, model_class: type, recordset: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
|
"""Delegate to access control module."""
|
|
# First apply access control
|
|
filteredRecords = self.access.uam(model_class, recordset)
|
|
|
|
# Then filter out database-specific fields
|
|
cleanedRecords = []
|
|
for record in filteredRecords:
|
|
# Create a new dict with only non-database fields
|
|
cleanedRecord = {k: v for k, v in record.items() if not k.startswith('_')}
|
|
cleanedRecords.append(cleanedRecord)
|
|
|
|
return cleanedRecords
|
|
|
|
def _canModify(self, model_class: type, recordId: Optional[str] = None) -> bool:
|
|
"""Delegate to access control module."""
|
|
return self.access.canModify(model_class, recordId)
|
|
|
|
|
|
# Utilities
|
|
|
|
def getInitialId(self, model_class: type) -> Optional[str]:
|
|
"""Returns the initial ID for a table."""
|
|
return self.db.getInitialId(model_class)
|
|
|
|
|
|
|
|
# Workflow methods
|
|
|
|
def getWorkflows(self) -> List[Dict[str, Any]]:
|
|
"""Returns workflows based on user access level."""
|
|
allWorkflows = self.db.getRecordset(ChatWorkflow)
|
|
return self._uam(ChatWorkflow, allWorkflows)
|
|
|
|
def getWorkflow(self, workflowId: str) -> Optional[ChatWorkflow]:
|
|
"""Returns a workflow by ID if user has access."""
|
|
workflows = self.db.getRecordset(ChatWorkflow, recordFilter={"id": workflowId})
|
|
if not workflows:
|
|
return None
|
|
|
|
filteredWorkflows = self._uam(ChatWorkflow, workflows)
|
|
if not filteredWorkflows:
|
|
return None
|
|
|
|
workflow = filteredWorkflows[0]
|
|
try:
|
|
# Load related data from normalized tables
|
|
logs = self.getLogs(workflowId)
|
|
messages = self.getMessages(workflowId)
|
|
stats = self.getWorkflowStats(workflowId)
|
|
|
|
# Validate workflow data against ChatWorkflow model
|
|
return ChatWorkflow(
|
|
id=workflow["id"],
|
|
status=workflow.get("status", "running"),
|
|
name=workflow.get("name"),
|
|
currentRound=workflow.get("currentRound", 0),
|
|
currentTask=workflow.get("currentTask", 0),
|
|
currentAction=workflow.get("currentAction", 0),
|
|
totalTasks=workflow.get("totalTasks", 0),
|
|
totalActions=workflow.get("totalActions", 0),
|
|
lastActivity=workflow.get("lastActivity", get_utc_timestamp()),
|
|
startedAt=workflow.get("startedAt", get_utc_timestamp()),
|
|
logs=logs,
|
|
messages=messages,
|
|
stats=stats,
|
|
mandateId=workflow.get("mandateId", self.currentUser.mandateId)
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Error validating workflow data: {str(e)}")
|
|
return None
|
|
|
|
def createWorkflow(self, workflowData: Dict[str, Any]) -> ChatWorkflow:
|
|
"""Creates a new workflow if user has permission."""
|
|
if not self._canModify(ChatWorkflow):
|
|
raise PermissionError("No permission to create workflows")
|
|
|
|
# Set timestamp if not present
|
|
currentTime = get_utc_timestamp()
|
|
if "startedAt" not in workflowData:
|
|
workflowData["startedAt"] = currentTime
|
|
|
|
if "lastActivity" not in workflowData:
|
|
workflowData["lastActivity"] = currentTime
|
|
|
|
# Use generic field separation based on ChatWorkflow model
|
|
simple_fields, object_fields = self._separate_object_fields(ChatWorkflow, workflowData)
|
|
|
|
# Create workflow in database
|
|
created = self.db.recordCreate(ChatWorkflow, simple_fields)
|
|
|
|
|
|
# Convert to ChatWorkflow model (empty related data for new workflow)
|
|
return ChatWorkflow(
|
|
id=created["id"],
|
|
status=created.get("status", "running"),
|
|
name=created.get("name"),
|
|
currentRound=created.get("currentRound", 0),
|
|
currentTask=created.get("currentTask", 0),
|
|
currentAction=created.get("currentAction", 0),
|
|
totalTasks=created.get("totalTasks", 0),
|
|
totalActions=created.get("totalActions", 0),
|
|
lastActivity=created.get("lastActivity", currentTime),
|
|
startedAt=created.get("startedAt", currentTime),
|
|
logs=[],
|
|
messages=[],
|
|
stats=None,
|
|
mandateId=created.get("mandateId", self.currentUser.mandateId),
|
|
workflowMode=created.get("workflowMode", "Actionplan"),
|
|
maxSteps=created.get("maxSteps", 1)
|
|
)
|
|
|
|
def updateWorkflow(self, workflowId: str, workflowData: Dict[str, Any]) -> ChatWorkflow:
|
|
"""Updates a workflow if user has access."""
|
|
# Check if the workflow exists and user has access
|
|
workflow = self.getWorkflow(workflowId)
|
|
if not workflow:
|
|
return None
|
|
|
|
if not self._canModify(ChatWorkflow, workflowId):
|
|
raise PermissionError(f"No permission to update workflow {workflowId}")
|
|
|
|
# Use generic field separation based on ChatWorkflow model
|
|
simple_fields, object_fields = self._separate_object_fields(ChatWorkflow, workflowData)
|
|
|
|
# Set update time for main workflow
|
|
simple_fields["lastActivity"] = get_utc_timestamp()
|
|
|
|
# Update main workflow in database
|
|
updated = self.db.recordModify(ChatWorkflow, workflowId, simple_fields)
|
|
|
|
|
|
# Handle object field updates (inline to avoid helper dependency)
|
|
if 'logs' in object_fields:
|
|
logs_data = object_fields['logs']
|
|
try:
|
|
for log_data in logs_data:
|
|
if hasattr(log_data, 'dict'):
|
|
log_dict = log_data.dict()
|
|
elif hasattr(log_data, 'to_dict'):
|
|
log_dict = log_data.to_dict()
|
|
else:
|
|
log_dict = log_data
|
|
log_dict["workflowId"] = workflowId
|
|
self.createLog(log_dict)
|
|
except Exception as e:
|
|
logger.error(f"Error updating workflow logs: {str(e)}")
|
|
if 'messages' in object_fields:
|
|
messages_data = object_fields['messages']
|
|
try:
|
|
for message_data in messages_data:
|
|
if hasattr(message_data, 'dict'):
|
|
msg_dict = message_data.dict()
|
|
elif hasattr(message_data, 'to_dict'):
|
|
msg_dict = message_data.to_dict()
|
|
else:
|
|
msg_dict = message_data
|
|
msg_dict["workflowId"] = workflowId
|
|
self.updateMessage(msg_dict.get("id"), msg_dict)
|
|
except Exception as e:
|
|
logger.error(f"Error updating workflow messages: {str(e)}")
|
|
if 'stats' in object_fields:
|
|
stats_data = object_fields['stats']
|
|
try:
|
|
if stats_data:
|
|
stats_data["workflowId"] = workflowId
|
|
self.db.recordCreate(ChatStat, stats_data)
|
|
except Exception as e:
|
|
logger.error(f"Error updating workflow stats: {str(e)}")
|
|
|
|
# Load fresh data from normalized tables
|
|
logs = self.getLogs(workflowId)
|
|
messages = self.getMessages(workflowId)
|
|
stats = self.getWorkflowStats(workflowId)
|
|
|
|
# Convert to ChatWorkflow model
|
|
return ChatWorkflow(
|
|
id=updated["id"],
|
|
status=updated.get("status", workflow.status),
|
|
name=updated.get("name", workflow.name),
|
|
currentRound=updated.get("currentRound", workflow.currentRound),
|
|
currentTask=updated.get("currentTask", workflow.currentTask),
|
|
currentAction=updated.get("currentAction", workflow.currentAction),
|
|
totalTasks=updated.get("totalTasks", workflow.totalTasks),
|
|
totalActions=updated.get("totalActions", workflow.totalActions),
|
|
lastActivity=updated.get("lastActivity", workflow.lastActivity),
|
|
startedAt=updated.get("startedAt", workflow.startedAt),
|
|
logs=logs,
|
|
messages=messages,
|
|
stats=stats,
|
|
mandateId=updated.get("mandateId", workflow.mandateId)
|
|
)
|
|
|
|
def deleteWorkflow(self, workflowId: str) -> bool:
|
|
"""Deletes a workflow and all related data if user has access."""
|
|
try:
|
|
# Check if the workflow exists and user has access
|
|
workflow = self.getWorkflow(workflowId)
|
|
if not workflow:
|
|
return False
|
|
|
|
if not self._canModify(ChatWorkflow, workflowId):
|
|
raise PermissionError(f"No permission to delete workflow {workflowId}")
|
|
|
|
# CASCADE DELETE: Delete all related data first
|
|
|
|
# 1. Delete all workflow messages and their related data
|
|
messages = self.getMessages(workflowId)
|
|
for message in messages:
|
|
messageId = message.id
|
|
if messageId:
|
|
# Delete message stats
|
|
existing_stats = self.db.getRecordset(ChatStat, recordFilter={"messageId": messageId})
|
|
for stat in existing_stats:
|
|
self.db.recordDelete(ChatStat, stat["id"])
|
|
|
|
# Delete message documents (but NOT the files!)
|
|
existing_docs = self.db.getRecordset(ChatDocument, recordFilter={"messageId": messageId})
|
|
for doc in existing_docs:
|
|
self.db.recordDelete(ChatDocument, doc["id"])
|
|
|
|
# Delete the message itself
|
|
self.db.recordDelete(ChatMessage, messageId)
|
|
|
|
# 2. Delete workflow stats
|
|
existing_stats = self.db.getRecordset(ChatStat, recordFilter={"workflowId": workflowId})
|
|
for stat in existing_stats:
|
|
self.db.recordDelete(ChatStat, stat["id"])
|
|
|
|
# 3. Delete workflow logs
|
|
existing_logs = self.db.getRecordset(ChatLog, recordFilter={"workflowId": workflowId})
|
|
for log in existing_logs:
|
|
self.db.recordDelete(ChatLog, log["id"])
|
|
|
|
# 4. Finally delete the workflow itself
|
|
success = self.db.recordDelete(ChatWorkflow, workflowId)
|
|
|
|
return success
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error deleting workflow {workflowId}: {str(e)}")
|
|
return False
|
|
|
|
|
|
# Message methods
|
|
|
|
def getMessages(self, workflowId: str) -> List[ChatMessage]:
|
|
"""Returns messages for a workflow if user has access to the workflow."""
|
|
# Check workflow access first (without calling getWorkflow to avoid circular reference)
|
|
workflows = self.db.getRecordset(ChatWorkflow, recordFilter={"id": workflowId})
|
|
if not workflows:
|
|
return []
|
|
|
|
filteredWorkflows = self._uam(ChatWorkflow, workflows)
|
|
if not filteredWorkflows:
|
|
return []
|
|
|
|
# Get messages for this workflow from normalized table
|
|
messages = self.db.getRecordset(ChatMessage, recordFilter={"workflowId": workflowId})
|
|
|
|
# Sort messages by publishedAt timestamp to ensure chronological order
|
|
messages.sort(key=lambda x: x.get("publishedAt", x.get("timestamp", "0")))
|
|
|
|
# Convert messages to ChatMessage objects and load documents
|
|
chat_messages = []
|
|
for msg in messages:
|
|
# Load documents from normalized documents table
|
|
documents = self.getDocuments(msg["id"])
|
|
|
|
# Create ChatMessage object with loaded documents
|
|
chat_message = ChatMessage(
|
|
id=msg["id"],
|
|
workflowId=msg["workflowId"],
|
|
parentMessageId=msg.get("parentMessageId"),
|
|
documents=documents,
|
|
documentsLabel=msg.get("documentsLabel"),
|
|
message=msg.get("message"),
|
|
role=msg.get("role", "assistant"),
|
|
status=msg.get("status", "step"),
|
|
sequenceNr=msg.get("sequenceNr", 0),
|
|
publishedAt=msg.get("publishedAt", get_utc_timestamp()),
|
|
stats=self.getMessageStats(msg["id"]),
|
|
success=msg.get("success"),
|
|
actionId=msg.get("actionId"),
|
|
actionMethod=msg.get("actionMethod"),
|
|
actionName=msg.get("actionName"),
|
|
roundNumber=msg.get("roundNumber"),
|
|
taskNumber=msg.get("taskNumber"),
|
|
actionNumber=msg.get("actionNumber"),
|
|
taskProgress=msg.get("taskProgress"),
|
|
actionProgress=msg.get("actionProgress")
|
|
)
|
|
|
|
chat_messages.append(chat_message)
|
|
|
|
|
|
return chat_messages
|
|
|
|
def createMessage(self, messageData: Dict[str, Any]) -> ChatMessage:
|
|
"""Creates a message for a workflow if user has access."""
|
|
try:
|
|
# Ensure ID is present
|
|
if "id" not in messageData or not messageData["id"]:
|
|
messageData["id"] = f"msg_{uuid.uuid4()}"
|
|
# Check required fields
|
|
requiredFields = ["id", "workflowId"]
|
|
for field in requiredFields:
|
|
if field not in messageData:
|
|
logger.error(f"Required field '{field}' missing in messageData")
|
|
raise ValueError(f"Required field '{field}' missing in message data")
|
|
|
|
# Check workflow access
|
|
workflowId = messageData["workflowId"]
|
|
workflow = self.getWorkflow(workflowId)
|
|
if not workflow:
|
|
raise PermissionError(f"No access to workflow {workflowId}")
|
|
|
|
if not self._canModify(ChatWorkflow, workflowId):
|
|
raise PermissionError(f"No permission to modify workflow {workflowId}")
|
|
|
|
# Validate that ID is not None
|
|
if messageData["id"] is None:
|
|
messageData["id"] = f"msg_{uuid.uuid4()}"
|
|
logger.warning(f"Automatically generated ID for workflow message: {messageData['id']}")
|
|
|
|
# Set status if not present
|
|
if "status" not in messageData:
|
|
messageData["status"] = "step" # Default status for intermediate messages
|
|
|
|
# Ensure role and agentName are present
|
|
if "role" not in messageData:
|
|
messageData["role"] = "assistant" if messageData.get("agentName") else "user"
|
|
|
|
if "agentName" not in messageData:
|
|
messageData["agentName"] = ""
|
|
|
|
# CRITICAL FIX: Automatically set roundNumber, taskNumber, and actionNumber if not provided
|
|
# This ensures messages have the correct progress context when workflows are continued
|
|
if "roundNumber" not in messageData:
|
|
messageData["roundNumber"] = workflow.currentRound
|
|
|
|
if "taskNumber" not in messageData:
|
|
messageData["taskNumber"] = workflow.currentTask
|
|
|
|
if "actionNumber" not in messageData:
|
|
messageData["actionNumber"] = workflow.currentAction
|
|
|
|
# Use generic field separation based on ChatMessage model
|
|
simple_fields, object_fields = self._separate_object_fields(ChatMessage, messageData)
|
|
|
|
# Handle documents separately - they will be stored in normalized documents table
|
|
documents_to_create = object_fields.get("documents", [])
|
|
|
|
# Create message in normalized table using only simple fields
|
|
createdMessage = self.db.recordCreate(ChatMessage, simple_fields)
|
|
|
|
|
|
# Create documents in normalized documents table
|
|
created_documents = []
|
|
for doc_data in documents_to_create:
|
|
# Convert to dict if it's a Pydantic object
|
|
if hasattr(doc_data, 'dict'):
|
|
doc_dict = doc_data.dict()
|
|
elif hasattr(doc_data, 'to_dict'):
|
|
doc_dict = doc_data.to_dict()
|
|
else:
|
|
doc_dict = doc_data
|
|
|
|
doc_dict["messageId"] = createdMessage["id"]
|
|
created_doc = self.createDocument(doc_dict)
|
|
if created_doc:
|
|
created_documents.append(created_doc)
|
|
|
|
# Convert to ChatMessage model
|
|
chat_message = ChatMessage(
|
|
id=createdMessage["id"],
|
|
workflowId=createdMessage["workflowId"],
|
|
parentMessageId=createdMessage.get("parentMessageId"),
|
|
agentName=createdMessage.get("agentName"),
|
|
documents=created_documents,
|
|
documentsLabel=createdMessage.get("documentsLabel"),
|
|
message=createdMessage.get("message"),
|
|
role=createdMessage.get("role", "assistant"),
|
|
status=createdMessage.get("status", "step"),
|
|
sequenceNr=len(workflow.messages) + 1, # Use messages list length for sequence number
|
|
publishedAt=createdMessage.get("publishedAt", get_utc_timestamp()),
|
|
stats=object_fields.get("stats"), # Use stats from object_fields
|
|
roundNumber=createdMessage.get("roundNumber"),
|
|
taskNumber=createdMessage.get("taskNumber"),
|
|
actionNumber=createdMessage.get("actionNumber"),
|
|
success=createdMessage.get("success"),
|
|
actionId=createdMessage.get("actionId"),
|
|
actionMethod=createdMessage.get("actionMethod"),
|
|
actionName=createdMessage.get("actionName")
|
|
)
|
|
|
|
# Debug: Store message and documents for debugging - only if debug enabled
|
|
debug_enabled = APP_CONFIG.get("APP_DEBUG_CHAT_WORKFLOW_ENABLED", False)
|
|
if debug_enabled:
|
|
self._storeDebugMessageAndDocuments(chat_message)
|
|
|
|
return chat_message
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error creating workflow message: {str(e)}")
|
|
return None
|
|
|
|
def updateMessage(self, messageId: str, messageData: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Updates a workflow message if user has access to the workflow."""
|
|
try:
|
|
|
|
# Ensure messageId is provided
|
|
if not messageId:
|
|
logger.error("No messageId provided for updateMessage")
|
|
raise ValueError("messageId cannot be empty")
|
|
|
|
# Check if message exists in database
|
|
messages = self.db.getRecordset(ChatMessage, recordFilter={"id": messageId})
|
|
if not messages:
|
|
logger.warning(f"Message with ID {messageId} does not exist in database")
|
|
|
|
# If message doesn't exist but we have workflowId, create it
|
|
if "workflowId" in messageData:
|
|
workflowId = messageData.get("workflowId")
|
|
|
|
# Check workflow access
|
|
workflow = self.getWorkflow(workflowId)
|
|
if not workflow:
|
|
raise PermissionError(f"No access to workflow {workflowId}")
|
|
|
|
if not self._canModify(ChatWorkflow, workflowId):
|
|
raise PermissionError(f"No permission to modify workflow {workflowId}")
|
|
|
|
logger.info(f"Creating new message with ID {messageId} for workflow {workflowId}")
|
|
return self.db.recordCreate(ChatMessage, messageData)
|
|
else:
|
|
logger.error(f"Workflow ID missing for new message {messageId}")
|
|
return None
|
|
|
|
# Update existing message
|
|
existingMessage = messages[0]
|
|
|
|
# Check workflow access
|
|
workflowId = existingMessage.get("workflowId")
|
|
workflow = self.getWorkflow(workflowId)
|
|
if not workflow:
|
|
raise PermissionError(f"No access to workflow {workflowId}")
|
|
|
|
if not self._canModify(ChatWorkflow, workflowId):
|
|
raise PermissionError(f"No permission to modify workflow {workflowId}")
|
|
|
|
# Use generic field separation based on ChatMessage model
|
|
simple_fields, object_fields = self._separate_object_fields(ChatMessage, messageData)
|
|
|
|
# Ensure required fields present
|
|
for key in ["role", "agentName"]:
|
|
if key not in simple_fields and key not in existingMessage:
|
|
simple_fields[key] = "assistant" if key == "role" else ""
|
|
|
|
# Ensure ID is in the dataset
|
|
if 'id' not in simple_fields:
|
|
simple_fields['id'] = messageId
|
|
|
|
# Convert createdAt to startedAt if needed
|
|
if "createdAt" in simple_fields and "startedAt" not in simple_fields:
|
|
simple_fields["startedAt"] = simple_fields["createdAt"]
|
|
del simple_fields["createdAt"]
|
|
|
|
# Update the message with simple fields only
|
|
updatedMessage = self.db.recordModify(ChatMessage, messageId, simple_fields)
|
|
|
|
# Handle object field updates (documents, stats) inline
|
|
if 'documents' in object_fields:
|
|
documents_data = object_fields['documents']
|
|
try:
|
|
for doc_data in documents_data:
|
|
if hasattr(doc_data, 'dict'):
|
|
doc_dict = doc_data.dict()
|
|
elif hasattr(doc_data, 'to_dict'):
|
|
doc_dict = doc_data.to_dict()
|
|
else:
|
|
doc_dict = doc_data
|
|
doc_dict["messageId"] = messageId
|
|
self.createDocument(doc_dict)
|
|
except Exception as e:
|
|
logger.error(f"Error updating message documents: {str(e)}")
|
|
if 'stats' in object_fields:
|
|
stats_data = object_fields['stats']
|
|
try:
|
|
if stats_data:
|
|
stats_data["messageId"] = messageId
|
|
self.db.recordCreate(ChatStat, stats_data)
|
|
except Exception as e:
|
|
logger.error(f"Error updating message stats: {str(e)}")
|
|
if not updatedMessage:
|
|
logger.warning(f"Failed to update message {messageId}")
|
|
|
|
return updatedMessage
|
|
except Exception as e:
|
|
logger.error(f"Error updating message {messageId}: {str(e)}", exc_info=True)
|
|
raise ValueError(f"Error updating message {messageId}: {str(e)}")
|
|
|
|
def deleteMessage(self, workflowId: str, messageId: str) -> bool:
|
|
"""Deletes a workflow message and all related data if user has access to the workflow."""
|
|
try:
|
|
# Check workflow access
|
|
workflow = self.getWorkflow(workflowId)
|
|
if not workflow:
|
|
logger.warning(f"No access to workflow {workflowId}")
|
|
return False
|
|
|
|
if not self._canModify(ChatWorkflow, workflowId):
|
|
raise PermissionError(f"No permission to modify workflow {workflowId}")
|
|
|
|
# Check if the message exists
|
|
messages = self.getMessages(workflowId)
|
|
message = next((m for m in messages if m.get("id") == messageId), None)
|
|
|
|
if not message:
|
|
logger.warning(f"Message {messageId} for workflow {workflowId} not found")
|
|
return False
|
|
|
|
# CASCADE DELETE: Delete all related data first
|
|
|
|
# 1. Delete message stats
|
|
existing_stats = self.db.getRecordset(ChatStat, recordFilter={"messageId": messageId})
|
|
for stat in existing_stats:
|
|
self.db.recordDelete(ChatStat, stat["id"])
|
|
|
|
# 2. Delete message documents (but NOT the files!)
|
|
existing_docs = self.db.getRecordset(ChatDocument, recordFilter={"messageId": messageId})
|
|
for doc in existing_docs:
|
|
self.db.recordDelete(ChatDocument, doc["id"])
|
|
|
|
# 3. Finally delete the message itself
|
|
success = self.db.recordDelete(ChatMessage, messageId)
|
|
|
|
return success
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error deleting message {messageId}: {str(e)}")
|
|
return False
|
|
|
|
def deleteFileFromMessage(self, workflowId: str, messageId: str, fileId: str) -> bool:
|
|
"""Removes a file reference from a message if user has access."""
|
|
try:
|
|
# Check workflow access
|
|
workflow = self.getWorkflow(workflowId)
|
|
if not workflow:
|
|
logger.warning(f"No access to workflow {workflowId}")
|
|
return False
|
|
|
|
if not self._canModify(ChatWorkflow, workflowId):
|
|
raise PermissionError(f"No permission to modify workflow {workflowId}")
|
|
|
|
|
|
# Get documents for this message from normalized table
|
|
documents = self.db.getRecordset(ChatDocument, recordFilter={"messageId": messageId})
|
|
|
|
if not documents:
|
|
logger.warning(f"No documents found for message {messageId}")
|
|
return False
|
|
|
|
# Find and delete the specific document
|
|
removed = False
|
|
for doc in documents:
|
|
docId = doc.get("id")
|
|
fileIdValue = doc.get("fileId")
|
|
|
|
# Flexible matching approach
|
|
shouldRemove = (
|
|
(docId == fileId) or
|
|
(fileIdValue == fileId) or
|
|
(isinstance(docId, str) and str(fileId) in docId) or
|
|
(isinstance(fileIdValue, str) and str(fileId) in fileIdValue)
|
|
)
|
|
|
|
if shouldRemove:
|
|
# Delete the document from normalized table
|
|
success = self.db.recordDelete(ChatDocument, docId)
|
|
if success:
|
|
removed = True
|
|
else:
|
|
logger.warning(f"Failed to delete document {docId}")
|
|
|
|
if not removed:
|
|
logger.warning(f"No matching file {fileId} found in message {messageId}")
|
|
return False
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error removing file {fileId} from message {messageId}: {str(e)}")
|
|
return False
|
|
|
|
# Document methods
|
|
|
|
def getDocuments(self, messageId: str) -> List[ChatDocument]:
|
|
"""Returns documents for a message from normalized table."""
|
|
try:
|
|
documents = self.db.getRecordset(ChatDocument, recordFilter={"messageId": messageId})
|
|
return [ChatDocument(**doc) for doc in documents]
|
|
except Exception as e:
|
|
logger.error(f"Error getting message documents: {str(e)}")
|
|
return []
|
|
|
|
def createDocument(self, documentData: Dict[str, Any]) -> ChatDocument:
|
|
"""Creates a document for a message in normalized table."""
|
|
try:
|
|
# Validate document data
|
|
document = ChatDocument(**documentData)
|
|
|
|
# Create document in normalized table
|
|
created = self.db.recordCreate(ChatDocument, document)
|
|
|
|
|
|
return ChatDocument(**created)
|
|
except Exception as e:
|
|
logger.error(f"Error creating message document: {str(e)}")
|
|
return None
|
|
|
|
|
|
# Log methods
|
|
|
|
def getLogs(self, workflowId: str) -> List[ChatLog]:
|
|
"""Returns logs for a workflow if user has access to the workflow."""
|
|
# Check workflow access first (without calling getWorkflow to avoid circular reference)
|
|
workflows = self.db.getRecordset(ChatWorkflow, recordFilter={"id": workflowId})
|
|
if not workflows:
|
|
return []
|
|
|
|
filteredWorkflows = self._uam(ChatWorkflow, workflows)
|
|
if not filteredWorkflows:
|
|
return []
|
|
|
|
# Get logs for this workflow from normalized table
|
|
logs = self.db.getRecordset(ChatLog, recordFilter={"workflowId": workflowId})
|
|
|
|
# Sort logs by timestamp (Unix timestamps)
|
|
logs.sort(key=lambda x: float(x.get("timestamp", 0)))
|
|
|
|
return [ChatLog(**log) for log in logs]
|
|
|
|
def createLog(self, logData: Dict[str, Any]) -> ChatLog:
|
|
"""Creates a log entry for a workflow if user has access."""
|
|
# Check workflow access
|
|
workflowId = logData.get("workflowId")
|
|
if not workflowId:
|
|
logger.error("No workflowId provided for createLog")
|
|
return None
|
|
|
|
workflow = self.getWorkflow(workflowId)
|
|
if not workflow:
|
|
logger.warning(f"No access to workflow {workflowId}")
|
|
return None
|
|
|
|
if not self._canModify(ChatWorkflow, workflowId):
|
|
logger.warning(f"No permission to modify workflow {workflowId}")
|
|
return None
|
|
|
|
# Make sure required fields are present
|
|
if "timestamp" not in logData:
|
|
logData["timestamp"] = get_utc_timestamp()
|
|
|
|
# Add status information if not present
|
|
if "status" not in logData and "type" in logData:
|
|
if logData["type"] == "error":
|
|
logData["status"] = "error"
|
|
else:
|
|
logData["status"] = "running"
|
|
|
|
# Add progress information if not present
|
|
if "progress" not in logData:
|
|
# Default progress values based on log type
|
|
if logData.get("type") == "info":
|
|
logData["progress"] = 50 # Default middle progress
|
|
elif logData.get("type") == "error":
|
|
logData["progress"] = -1 # Error state
|
|
elif logData.get("type") == "warning":
|
|
logData["progress"] = 50 # Default middle progress
|
|
|
|
# Validate log data against ChatLog model
|
|
try:
|
|
log_model = ChatLog(**logData)
|
|
except Exception as e:
|
|
logger.error(f"Invalid log data: {str(e)}")
|
|
return None
|
|
|
|
# Create log in normalized table
|
|
createdLog = self.db.recordCreate(ChatLog, log_model)
|
|
|
|
# Return validated ChatLog instance
|
|
return ChatLog(**createdLog)
|
|
|
|
|
|
# Stats methods
|
|
|
|
def getMessageStats(self, messageId: str) -> Optional[ChatStat]:
|
|
"""Returns statistics for a message from normalized table."""
|
|
try:
|
|
stats = self.db.getRecordset(ChatStat, recordFilter={"messageId": messageId})
|
|
if not stats:
|
|
return None
|
|
# Return the most recent stats record
|
|
stats.sort(key=lambda x: x.get("created_at", ""), reverse=True)
|
|
return ChatStat(**stats[0])
|
|
except Exception as e:
|
|
logger.error(f"Error getting message stats: {str(e)}")
|
|
return None
|
|
|
|
def getWorkflowStats(self, workflowId: str) -> Optional[ChatStat]:
|
|
"""Returns statistics for a workflow if user has access."""
|
|
# Check workflow access first (without calling getWorkflow to avoid circular reference)
|
|
workflows = self.db.getRecordset(ChatWorkflow, recordFilter={"id": workflowId})
|
|
if not workflows:
|
|
return None
|
|
|
|
filteredWorkflows = self._uam(ChatWorkflow, workflows)
|
|
if not filteredWorkflows:
|
|
return None
|
|
|
|
# Get stats for this workflow from normalized table
|
|
stats = self.db.getRecordset(ChatStat, recordFilter={"workflowId": workflowId})
|
|
|
|
if not stats:
|
|
return None
|
|
|
|
# Return the most recent stats record
|
|
stats.sort(key=lambda x: x.get("created_at", ""), reverse=True)
|
|
return ChatStat(**stats[0])
|
|
|
|
def updateWorkflowStats(self, workflowId: str, bytesSent: int = 0, bytesReceived: int = 0, tokenCount: int = 0) -> None:
|
|
"""
|
|
Updates workflow statistics in the database.
|
|
|
|
Args:
|
|
workflowId: ID of the workflow to update
|
|
bytesSent: Bytes sent (incremental)
|
|
bytesReceived: Bytes received (incremental)
|
|
tokenCount: Token count (incremental, default 0)
|
|
"""
|
|
try:
|
|
# Check workflow access first
|
|
workflow = self.getWorkflow(workflowId)
|
|
if not workflow:
|
|
logger.warning(f"No access to workflow {workflowId} for stats update")
|
|
return
|
|
|
|
if not self._canModify(ChatWorkflow, workflowId):
|
|
logger.warning(f"No permission to modify workflow {workflowId} for stats update")
|
|
return
|
|
|
|
# Get existing stats or create new ones
|
|
existing_stats = self.getWorkflowStats(workflowId)
|
|
|
|
if existing_stats:
|
|
# Update existing stats
|
|
updated_stats = {
|
|
"bytesSent": (existing_stats.bytesSent or 0) + bytesSent,
|
|
"bytesReceived": (existing_stats.bytesReceived or 0) + bytesReceived,
|
|
"tokenCount": (existing_stats.tokenCount or 0) + tokenCount,
|
|
"lastUpdated": get_utc_timestamp()
|
|
}
|
|
|
|
# Update the stats record
|
|
self.db.recordModify(ChatStat, existing_stats.id, updated_stats)
|
|
else:
|
|
# Create new stats record
|
|
new_stats = {
|
|
"workflowId": workflowId,
|
|
"bytesSent": bytesSent,
|
|
"bytesReceived": bytesReceived,
|
|
"tokenCount": tokenCount,
|
|
"lastUpdated": get_utc_timestamp()
|
|
}
|
|
|
|
self.db.recordCreate(ChatStat, new_stats)
|
|
|
|
logger.debug(f"Updated workflow stats for {workflowId}: +{bytesSent} sent, +{bytesReceived} received, +{tokenCount} tokens")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error updating workflow stats for {workflowId}: {str(e)}")
|
|
|
|
def getUnifiedChatData(self, workflowId: str, afterTimestamp: Optional[float] = None) -> Dict[str, Any]:
|
|
"""
|
|
Returns unified chat data (messages, logs, stats) for a workflow in chronological order.
|
|
Uses timestamp-based selective data transfer for efficient polling.
|
|
"""
|
|
# Check workflow access first
|
|
workflows = self.db.getRecordset(ChatWorkflow, recordFilter={"id": workflowId})
|
|
if not workflows:
|
|
return {"items": []}
|
|
|
|
filteredWorkflows = self._uam(ChatWorkflow, workflows)
|
|
if not filteredWorkflows:
|
|
return {"items": []}
|
|
|
|
# Get all data types and filter in Python (PostgreSQL connector doesn't support $gt operators)
|
|
items = []
|
|
|
|
# Get messages
|
|
messages = self.db.getRecordset(ChatMessage, recordFilter={"workflowId": workflowId})
|
|
for msg in messages:
|
|
# Apply timestamp filtering in Python
|
|
msg_timestamp = msg.get("publishedAt", get_utc_timestamp())
|
|
if afterTimestamp is not None and msg_timestamp <= afterTimestamp:
|
|
continue
|
|
|
|
# Load documents for each message
|
|
documents = self.getDocuments(msg["id"])
|
|
|
|
# Create ChatMessage object with loaded documents
|
|
chat_message = ChatMessage(
|
|
id=msg["id"],
|
|
workflowId=msg["workflowId"],
|
|
parentMessageId=msg.get("parentMessageId"),
|
|
documents=documents,
|
|
documentsLabel=msg.get("documentsLabel"),
|
|
message=msg.get("message"),
|
|
role=msg.get("role", "assistant"),
|
|
status=msg.get("status", "step"),
|
|
sequenceNr=msg.get("sequenceNr", 0),
|
|
publishedAt=msg.get("publishedAt", get_utc_timestamp()),
|
|
stats=self.getMessageStats(msg["id"]),
|
|
success=msg.get("success"),
|
|
actionId=msg.get("actionId"),
|
|
actionMethod=msg.get("actionMethod"),
|
|
actionName=msg.get("actionName"),
|
|
roundNumber=msg.get("roundNumber"),
|
|
taskNumber=msg.get("taskNumber"),
|
|
actionNumber=msg.get("actionNumber"),
|
|
taskProgress=msg.get("taskProgress"),
|
|
actionProgress=msg.get("actionProgress")
|
|
)
|
|
|
|
# Use publishedAt as the timestamp for chronological ordering
|
|
items.append({
|
|
"type": "message",
|
|
"createdAt": msg_timestamp,
|
|
"item": chat_message.dict()
|
|
})
|
|
|
|
# Get logs
|
|
logs = self.db.getRecordset(ChatLog, recordFilter={"workflowId": workflowId})
|
|
for log in logs:
|
|
# Apply timestamp filtering in Python
|
|
log_timestamp = log.get("timestamp", get_utc_timestamp())
|
|
if afterTimestamp is not None and log_timestamp <= afterTimestamp:
|
|
continue
|
|
|
|
chat_log = ChatLog(**log)
|
|
items.append({
|
|
"type": "log",
|
|
"createdAt": log_timestamp,
|
|
"item": chat_log.dict()
|
|
})
|
|
|
|
# Get stats
|
|
stats = self.db.getRecordset(ChatStat, recordFilter={"workflowId": workflowId})
|
|
for stat in stats:
|
|
# Apply timestamp filtering in Python
|
|
stat_timestamp = stat.get("_createdAt", get_utc_timestamp())
|
|
if afterTimestamp is not None and stat_timestamp <= afterTimestamp:
|
|
continue
|
|
|
|
chat_stat = ChatStat(**stat)
|
|
items.append({
|
|
"type": "stat",
|
|
"createdAt": stat_timestamp,
|
|
"item": chat_stat.dict()
|
|
})
|
|
|
|
# Sort all items by createdAt timestamp for chronological order
|
|
items.sort(key=lambda x: x["createdAt"])
|
|
|
|
return {"items": items}
|
|
|
|
def _storeDebugMessageAndDocuments(self, message: ChatMessage) -> None:
|
|
"""
|
|
Store message and documents (metadata and file bytes) for debugging purposes.
|
|
Structure: gateway/test-chat/messages/m_round_task_action_timestamp/documentlist_label/
|
|
- message.json, message_text.txt
|
|
- document_###_metadata.json
|
|
- document_###_<original_filename> (actual file bytes)
|
|
|
|
Args:
|
|
message: ChatMessage object to store
|
|
"""
|
|
try:
|
|
import os
|
|
import json
|
|
from datetime import datetime, UTC
|
|
|
|
# Create base debug directory
|
|
debug_root = "./test-chat/messages"
|
|
os.makedirs(debug_root, exist_ok=True)
|
|
|
|
# Generate timestamp
|
|
timestamp = datetime.now(UTC).strftime('%Y%m%d-%H%M%S-%f')[:-3]
|
|
|
|
# Create message folder name: m_round_task_action_timestamp
|
|
# Use actual values from message, not defaults
|
|
round_str = str(message.roundNumber) if message.roundNumber is not None else "0"
|
|
task_str = str(message.taskNumber) if message.taskNumber is not None else "0"
|
|
action_str = str(message.actionNumber) if message.actionNumber is not None else "0"
|
|
message_folder = f"{timestamp}_m_{round_str}_{task_str}_{action_str}"
|
|
|
|
message_path = os.path.join(debug_root, message_folder)
|
|
os.makedirs(message_path, exist_ok=True)
|
|
|
|
# Store message data - use dict() instead of model_dump() for compatibility
|
|
message_file = os.path.join(message_path, "message.json")
|
|
with open(message_file, "w", encoding="utf-8") as f:
|
|
# Convert message to dict manually to avoid model_dump() issues
|
|
message_dict = {
|
|
"id": message.id,
|
|
"workflowId": message.workflowId,
|
|
"parentMessageId": message.parentMessageId,
|
|
"message": message.message,
|
|
"role": message.role,
|
|
"status": message.status,
|
|
"sequenceNr": message.sequenceNr,
|
|
"publishedAt": message.publishedAt,
|
|
"roundNumber": message.roundNumber,
|
|
"taskNumber": message.taskNumber,
|
|
"actionNumber": message.actionNumber,
|
|
"documentsLabel": message.documentsLabel,
|
|
"actionId": message.actionId,
|
|
"actionMethod": message.actionMethod,
|
|
"actionName": message.actionName,
|
|
"success": message.success,
|
|
"documents": []
|
|
}
|
|
json.dump(message_dict, f, indent=2, ensure_ascii=False, default=str)
|
|
|
|
# Store message content as text
|
|
if message.message:
|
|
message_text_file = os.path.join(message_path, "message_text.txt")
|
|
with open(message_text_file, "w", encoding="utf-8") as f:
|
|
f.write(str(message.message))
|
|
|
|
# Store documents if provided
|
|
if message.documents and len(message.documents) > 0:
|
|
logger.info(f"Debug: Processing {len(message.documents)} documents")
|
|
|
|
# Group documents by documentsLabel
|
|
documents_by_label = {}
|
|
for doc in message.documents:
|
|
label = message.documentsLabel or 'default'
|
|
if label not in documents_by_label:
|
|
documents_by_label[label] = []
|
|
documents_by_label[label].append(doc)
|
|
|
|
# Create subfolder for each document label
|
|
for label, docs in documents_by_label.items():
|
|
# Sanitize label for filesystem
|
|
safe_label = "".join(c for c in str(label) if c.isalnum() or c in (' ', '-', '_')).rstrip()
|
|
safe_label = safe_label.replace(' ', '_')
|
|
if not safe_label:
|
|
safe_label = "default"
|
|
|
|
label_folder = os.path.join(message_path, safe_label)
|
|
os.makedirs(label_folder, exist_ok=True)
|
|
logger.info(f"Debug: Created document folder: {label_folder}")
|
|
|
|
# Store each document
|
|
for i, doc in enumerate(docs):
|
|
# Create document metadata file
|
|
doc_meta = {
|
|
"id": doc.id,
|
|
"messageId": doc.messageId,
|
|
"fileId": doc.fileId,
|
|
"fileName": doc.fileName,
|
|
"fileSize": doc.fileSize,
|
|
"mimeType": doc.mimeType,
|
|
"roundNumber": doc.roundNumber,
|
|
"taskNumber": doc.taskNumber,
|
|
"actionNumber": doc.actionNumber,
|
|
"actionId": doc.actionId
|
|
}
|
|
|
|
doc_meta_file = os.path.join(label_folder, f"document_{i+1:03d}_metadata.json")
|
|
with open(doc_meta_file, "w", encoding="utf-8") as f:
|
|
json.dump(doc_meta, f, indent=2, ensure_ascii=False, default=str)
|
|
|
|
logger.info(f"Debug: Stored document metadata for {doc.fileName}")
|
|
|
|
# Also store the actual file bytes next to metadata for debugging
|
|
try:
|
|
# Lazy import to avoid circular deps at module load
|
|
from modules.interfaces import interfaceDbComponentObjects as comp
|
|
componentInterface = comp.getInterface(self.currentUser)
|
|
file_bytes = componentInterface.getFileData(doc.fileId)
|
|
if file_bytes:
|
|
# Build a safe filename preserving original name
|
|
safe_name = doc.fileName or f"document_{i+1:03d}"
|
|
# Avoid path traversal
|
|
safe_name = os.path.basename(safe_name)
|
|
doc_file_path = os.path.join(label_folder, f"document_{i+1:03d}_" + safe_name)
|
|
with open(doc_file_path, "wb") as df:
|
|
df.write(file_bytes)
|
|
logger.info(f"Debug: Stored document file bytes: {doc_file_path} ({len(file_bytes)} bytes)")
|
|
else:
|
|
logger.warning(f"Debug: No file bytes returned for fileId {doc.fileId}")
|
|
except Exception as e:
|
|
logger.error(f"Debug: Failed to store document file for {doc.fileName} (fileId {doc.fileId}): {e}")
|
|
|
|
logger.info(f"Debug: Stored message and documents in {message_path}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Debug: Failed to store message and documents: {e}")
|
|
import traceback
|
|
logger.error(f"Debug: Traceback: {traceback.format_exc()}")
|
|
|
|
|
|
def getInterface(currentUser: Optional[User] = None) -> 'ChatObjects':
|
|
"""
|
|
Returns a ChatObjects instance for the current user.
|
|
Handles initialization of database and records.
|
|
"""
|
|
if not currentUser:
|
|
raise ValueError("Invalid user context: user is required")
|
|
|
|
# Create context key
|
|
contextKey = f"{currentUser.mandateId}_{currentUser.id}"
|
|
|
|
# Create new instance if not exists
|
|
if contextKey not in _chatInterfaces:
|
|
_chatInterfaces[contextKey] = ChatObjects(currentUser)
|
|
|
|
return _chatInterfaces[contextKey] |