gateway/modules/interfaces/interfaceChatObjects.py
2025-09-08 12:45:03 +02:00

1149 lines
No EOL
49 KiB
Python

"""
Interface to LucyDOM database and AI Connectors.
Uses the JSON connector for data access with added language support.
"""
import os
import logging
import uuid
from datetime import datetime, UTC, timezone
from typing import Dict, Any, List, Optional, Union, get_origin, get_args
import asyncio
from modules.interfaces.interfaceChatAccess import ChatAccess
from modules.interfaces.interfaceChatModel import (
TaskStatus, UserInputRequest, ChatDocument, TaskItem, ChatStat, ChatLog, ChatMessage, ChatWorkflow, TaskAction, TaskResult, ActionResult
)
from modules.interfaces.interfaceAppModel import User
# DYNAMIC PART: Connectors to the Interface
from modules.connectors.connectorDbPostgre import DatabaseConnector
from modules.shared.timezoneUtils import get_utc_timestamp
# Basic Configurations
from modules.shared.configuration import APP_CONFIG
logger = logging.getLogger(__name__)
# Singleton factory for Chat instances
_chatInterfaces = {}
class ChatObjects:
"""
Interface to Chat database and AI Connectors.
Uses the JSON connector for data access with added language support.
"""
def __init__(self, currentUser: Optional[User] = None):
"""Initializes the Chat Interface."""
# Initialize variables
self.currentUser = currentUser # Store User object directly
self.userId = currentUser.id if currentUser else None
self.mandateId = currentUser.mandateId if currentUser else None
self.access = None # Will be set when user context is provided
# Initialize services
self._initializeServices()
# Initialize database
self._initializeDatabase()
# Set user context if provided
if currentUser:
self.setUserContext(currentUser)
# ===== Generic Utility Methods =====
def _is_object_field(self, field_type) -> bool:
"""Check if a field type represents a complex object (not a simple type)."""
# Simple scalar types
if field_type in (str, int, float, bool, type(None)):
return False
# Everything else is an object
return True
def _separate_object_fields(self, model_class, data: Dict[str, Any]) -> tuple[Dict[str, Any], Dict[str, Any]]:
"""Separate simple fields from object fields based on Pydantic model structure."""
simple_fields = {}
object_fields = {}
# Get field information from the Pydantic model
model_fields = {}
if hasattr(model_class, '__fields__'):
model_fields = model_class.__fields__
for field_name, value in data.items():
# Check if this field should be stored as JSONB in the database
if field_name in model_fields:
field_info = model_fields[field_name]
field_type = field_info.type_
# Check if this is a JSONB field (Dict, List, or complex types)
if (field_type == dict or
field_type == list or
(hasattr(field_type, '__origin__') and field_type.__origin__ in (dict, list)) or
field_name in ['execParameters', 'expectedDocumentFormats', 'resultDocuments']):
# Store as JSONB - include in simple_fields for database storage
simple_fields[field_name] = value
elif isinstance(value, (str, int, float, bool, type(None))):
# Simple scalar types
simple_fields[field_name] = value
else:
# Complex objects that should be filtered out
object_fields[field_name] = value
else:
# Field not in model - treat as scalar if simple, otherwise filter out
if isinstance(value, (str, int, float, bool, type(None))):
simple_fields[field_name] = value
else:
object_fields[field_name] = value
return simple_fields, object_fields
def _initializeServices(self):
pass
def setUserContext(self, currentUser: User):
"""Sets the user context for the interface."""
self.currentUser = currentUser # Store User object directly
self.userId = currentUser.id
self.mandateId = currentUser.mandateId
if not self.userId or not self.mandateId:
raise ValueError("Invalid user context: id and mandateId are required")
# Add language settings
self.userLanguage = currentUser.language # Default user language
# Initialize access control with user context
self.access = ChatAccess(self.currentUser, self.db) # Convert to dict only when needed
# Update database context
self.db.updateContext(self.userId)
def __del__(self):
"""Cleanup method to close database connection."""
if hasattr(self, 'db') and self.db is not None:
try:
self.db.close()
except Exception as e:
logger.error(f"Error closing database connection: {e}")
logger.debug(f"User context set: userId={self.userId}, mandateId={self.mandateId}")
def _initializeDatabase(self):
"""Initializes the database connection directly."""
try:
# Get configuration values with defaults
dbHost = APP_CONFIG.get("DB_CHAT_HOST", "_no_config_default_data")
dbDatabase = APP_CONFIG.get("DB_CHAT_DATABASE", "chat")
dbUser = APP_CONFIG.get("DB_CHAT_USER")
dbPassword = APP_CONFIG.get("DB_CHAT_PASSWORD_SECRET")
dbPort = int(APP_CONFIG.get("DB_CHAT_PORT", 5432))
# Create database connector directly
self.db = DatabaseConnector(
dbHost=dbHost,
dbDatabase=dbDatabase,
dbUser=dbUser,
dbPassword=dbPassword,
dbPort=dbPort,
userId=self.userId
)
# Initialize database system
self.db.initDbSystem()
logger.info("Database initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize database: {str(e)}")
raise
def _initRecords(self):
"""Initializes standard records in the database if they don't exist."""
pass
def _uam(self, model_class: type, recordset: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Delegate to access control module."""
# First apply access control
filteredRecords = self.access.uam(model_class, recordset)
# Then filter out database-specific fields
cleanedRecords = []
for record in filteredRecords:
# Create a new dict with only non-database fields
cleanedRecord = {k: v for k, v in record.items() if not k.startswith('_')}
cleanedRecords.append(cleanedRecord)
return cleanedRecords
def _canModify(self, model_class: type, recordId: Optional[str] = None) -> bool:
"""Delegate to access control module."""
return self.access.canModify(model_class, recordId)
# Utilities
def getInitialId(self, model_class: type) -> Optional[str]:
"""Returns the initial ID for a table."""
return self.db.getInitialId(model_class)
# Workflow methods
def getWorkflows(self) -> List[Dict[str, Any]]:
"""Returns workflows based on user access level."""
allWorkflows = self.db.getRecordset(ChatWorkflow)
return self._uam(ChatWorkflow, allWorkflows)
def getWorkflow(self, workflowId: str) -> Optional[ChatWorkflow]:
"""Returns a workflow by ID if user has access."""
workflows = self.db.getRecordset(ChatWorkflow, recordFilter={"id": workflowId})
if not workflows:
return None
filteredWorkflows = self._uam(ChatWorkflow, workflows)
if not filteredWorkflows:
return None
workflow = filteredWorkflows[0]
try:
# Load related data from normalized tables
logs = self.getLogs(workflowId)
messages = self.getMessages(workflowId)
stats = self.getWorkflowStats(workflowId)
# Validate workflow data against ChatWorkflow model
return ChatWorkflow(
id=workflow["id"],
status=workflow.get("status", "running"),
name=workflow.get("name"),
currentRound=workflow.get("currentRound", 0),
currentTask=workflow.get("currentTask", 0),
currentAction=workflow.get("currentAction", 0),
totalTasks=workflow.get("totalTasks", 0),
totalActions=workflow.get("totalActions", 0),
lastActivity=workflow.get("lastActivity", get_utc_timestamp()),
startedAt=workflow.get("startedAt", get_utc_timestamp()),
logs=logs,
messages=messages,
stats=stats,
mandateId=workflow.get("mandateId", self.currentUser.mandateId)
)
except Exception as e:
logger.error(f"Error validating workflow data: {str(e)}")
return None
def createWorkflow(self, workflowData: Dict[str, Any]) -> ChatWorkflow:
"""Creates a new workflow if user has permission."""
if not self._canModify(ChatWorkflow):
raise PermissionError("No permission to create workflows")
# Set timestamp if not present
currentTime = get_utc_timestamp()
if "startedAt" not in workflowData:
workflowData["startedAt"] = currentTime
if "lastActivity" not in workflowData:
workflowData["lastActivity"] = currentTime
# Use generic field separation based on ChatWorkflow model
simple_fields, object_fields = self._separate_object_fields(ChatWorkflow, workflowData)
# Create workflow in database
created = self.db.recordCreate(ChatWorkflow, simple_fields)
# Convert to ChatWorkflow model (empty related data for new workflow)
return ChatWorkflow(
id=created["id"],
status=created.get("status", "running"),
name=created.get("name"),
currentRound=created.get("currentRound", 0),
currentTask=created.get("currentTask", 0),
currentAction=created.get("currentAction", 0),
totalTasks=created.get("totalTasks", 0),
totalActions=created.get("totalActions", 0),
lastActivity=created.get("lastActivity", currentTime),
startedAt=created.get("startedAt", currentTime),
logs=[],
messages=[],
stats=None,
mandateId=created.get("mandateId", self.currentUser.mandateId)
)
def updateWorkflow(self, workflowId: str, workflowData: Dict[str, Any]) -> ChatWorkflow:
"""Updates a workflow if user has access."""
# Check if the workflow exists and user has access
workflow = self.getWorkflow(workflowId)
if not workflow:
return None
if not self._canModify(ChatWorkflow, workflowId):
raise PermissionError(f"No permission to update workflow {workflowId}")
# Use generic field separation based on ChatWorkflow model
simple_fields, object_fields = self._separate_object_fields(ChatWorkflow, workflowData)
# Set update time for main workflow
simple_fields["lastActivity"] = get_utc_timestamp()
# Update main workflow in database
updated = self.db.recordModify(ChatWorkflow, workflowId, simple_fields)
# Handle object field updates (inline to avoid helper dependency)
if 'logs' in object_fields:
logs_data = object_fields['logs']
try:
for log_data in logs_data:
if hasattr(log_data, 'dict'):
log_dict = log_data.dict()
elif hasattr(log_data, 'to_dict'):
log_dict = log_data.to_dict()
else:
log_dict = log_data
log_dict["workflowId"] = workflowId
self.createLog(log_dict)
logger.debug(f"Updated {len(logs_data)} logs for workflow {workflowId}")
except Exception as e:
logger.error(f"Error updating workflow logs: {str(e)}")
if 'messages' in object_fields:
messages_data = object_fields['messages']
try:
for message_data in messages_data:
if hasattr(message_data, 'dict'):
msg_dict = message_data.dict()
elif hasattr(message_data, 'to_dict'):
msg_dict = message_data.to_dict()
else:
msg_dict = message_data
msg_dict["workflowId"] = workflowId
self.updateMessage(msg_dict.get("id"), msg_dict)
logger.debug(f"Updated {len(messages_data)} messages for workflow {workflowId}")
except Exception as e:
logger.error(f"Error updating workflow messages: {str(e)}")
if 'stats' in object_fields:
stats_data = object_fields['stats']
try:
if stats_data:
stats_data["workflowId"] = workflowId
self.db.recordCreate(ChatStat, stats_data)
logger.debug(f"Updated stats for workflow {workflowId}")
except Exception as e:
logger.error(f"Error updating workflow stats: {str(e)}")
# Load fresh data from normalized tables
logs = self.getLogs(workflowId)
messages = self.getMessages(workflowId)
stats = self.getWorkflowStats(workflowId)
# Convert to ChatWorkflow model
return ChatWorkflow(
id=updated["id"],
status=updated.get("status", workflow.status),
name=updated.get("name", workflow.name),
currentRound=updated.get("currentRound", workflow.currentRound),
currentTask=updated.get("currentTask", workflow.currentTask),
currentAction=updated.get("currentAction", workflow.currentAction),
totalTasks=updated.get("totalTasks", workflow.totalTasks),
totalActions=updated.get("totalActions", workflow.totalActions),
lastActivity=updated.get("lastActivity", workflow.lastActivity),
startedAt=updated.get("startedAt", workflow.startedAt),
logs=logs,
messages=messages,
stats=stats,
mandateId=updated.get("mandateId", workflow.mandateId)
)
def deleteWorkflow(self, workflowId: str) -> bool:
"""Deletes a workflow and all related data if user has access."""
try:
# Check if the workflow exists and user has access
workflow = self.getWorkflow(workflowId)
if not workflow:
return False
if not self._canModify(ChatWorkflow, workflowId):
raise PermissionError(f"No permission to delete workflow {workflowId}")
# CASCADE DELETE: Delete all related data first
# 1. Delete all workflow messages and their related data
messages = self.getMessages(workflowId)
for message in messages:
messageId = message.id
if messageId:
# Delete message stats
existing_stats = self.db.getRecordset(ChatStat, recordFilter={"messageId": messageId})
for stat in existing_stats:
self.db.recordDelete(ChatStat, stat["id"])
# Delete message documents (but NOT the files!)
existing_docs = self.db.getRecordset(ChatDocument, recordFilter={"messageId": messageId})
for doc in existing_docs:
self.db.recordDelete(ChatDocument, doc["id"])
# Delete the message itself
self.db.recordDelete(ChatMessage, messageId)
# 2. Delete workflow stats
existing_stats = self.db.getRecordset(ChatStat, recordFilter={"workflowId": workflowId})
for stat in existing_stats:
self.db.recordDelete(ChatStat, stat["id"])
# 3. Delete workflow logs
existing_logs = self.db.getRecordset(ChatLog, recordFilter={"workflowId": workflowId})
for log in existing_logs:
self.db.recordDelete(ChatLog, log["id"])
# 4. Finally delete the workflow itself
success = self.db.recordDelete(ChatWorkflow, workflowId)
logger.debug(f"Successfully deleted workflow {workflowId} and all related data")
return success
except Exception as e:
logger.error(f"Error deleting workflow {workflowId}: {str(e)}")
return False
# Message methods
def getMessages(self, workflowId: str) -> List[ChatMessage]:
"""Returns messages for a workflow if user has access to the workflow."""
# Check workflow access first (without calling getWorkflow to avoid circular reference)
workflows = self.db.getRecordset(ChatWorkflow, recordFilter={"id": workflowId})
if not workflows:
return []
filteredWorkflows = self._uam(ChatWorkflow, workflows)
if not filteredWorkflows:
return []
# Get messages for this workflow from normalized table
messages = self.db.getRecordset(ChatMessage, recordFilter={"workflowId": workflowId})
# Sort messages by publishedAt timestamp to ensure chronological order
messages.sort(key=lambda x: x.get("publishedAt", x.get("timestamp", "0")))
# Convert messages to ChatMessage objects and load documents
chat_messages = []
for msg in messages:
# Load documents from normalized documents table
documents = self.getDocuments(msg["id"])
# Create ChatMessage object with loaded documents
chat_message = ChatMessage(
id=msg["id"],
workflowId=msg["workflowId"],
parentMessageId=msg.get("parentMessageId"),
documents=documents,
documentsLabel=msg.get("documentsLabel"),
message=msg.get("message"),
role=msg.get("role", "assistant"),
status=msg.get("status", "step"),
sequenceNr=msg.get("sequenceNr", 0),
publishedAt=msg.get("publishedAt", get_utc_timestamp()),
stats=self.getMessageStats(msg["id"]),
success=msg.get("success"),
actionId=msg.get("actionId"),
actionMethod=msg.get("actionMethod"),
actionName=msg.get("actionName"),
roundNumber=msg.get("roundNumber"),
taskNumber=msg.get("taskNumber"),
actionNumber=msg.get("actionNumber"),
taskProgress=msg.get("taskProgress"),
actionProgress=msg.get("actionProgress")
)
chat_messages.append(chat_message)
return chat_messages
def createMessage(self, messageData: Dict[str, Any]) -> ChatMessage:
"""Creates a message for a workflow if user has access."""
try:
# Ensure ID is present
if "id" not in messageData or not messageData["id"]:
messageData["id"] = f"msg_{uuid.uuid4()}"
# Check required fields
requiredFields = ["id", "workflowId"]
for field in requiredFields:
if field not in messageData:
logger.error(f"Required field '{field}' missing in messageData")
raise ValueError(f"Required field '{field}' missing in message data")
# Check workflow access
workflowId = messageData["workflowId"]
workflow = self.getWorkflow(workflowId)
if not workflow:
raise PermissionError(f"No access to workflow {workflowId}")
if not self._canModify(ChatWorkflow, workflowId):
raise PermissionError(f"No permission to modify workflow {workflowId}")
# Validate that ID is not None
if messageData["id"] is None:
messageData["id"] = f"msg_{uuid.uuid4()}"
logger.warning(f"Automatically generated ID for workflow message: {messageData['id']}")
# Set status if not present
if "status" not in messageData:
messageData["status"] = "step" # Default status for intermediate messages
# Ensure role and agentName are present
if "role" not in messageData:
messageData["role"] = "assistant" if messageData.get("agentName") else "user"
if "agentName" not in messageData:
messageData["agentName"] = ""
# CRITICAL FIX: Automatically set roundNumber, taskNumber, and actionNumber if not provided
# This ensures messages have the correct progress context when workflows are continued
if "roundNumber" not in messageData:
messageData["roundNumber"] = workflow.currentRound
logger.debug(f"Auto-setting roundNumber to {workflow.currentRound} for message {messageData['id']}")
if "taskNumber" not in messageData:
messageData["taskNumber"] = workflow.currentTask
logger.debug(f"Auto-setting taskNumber to {workflow.currentTask} for message {messageData['id']}")
if "actionNumber" not in messageData:
messageData["actionNumber"] = workflow.currentAction
logger.debug(f"Auto-setting actionNumber to {workflow.currentAction} for message {messageData['id']}")
# Use generic field separation based on ChatMessage model
simple_fields, object_fields = self._separate_object_fields(ChatMessage, messageData)
# Handle documents separately - they will be stored in normalized documents table
documents_to_create = object_fields.get("documents", [])
# Create message in normalized table using only simple fields
createdMessage = self.db.recordCreate(ChatMessage, simple_fields)
# Create documents in normalized documents table
created_documents = []
for doc_data in documents_to_create:
# Convert to dict if it's a Pydantic object
if hasattr(doc_data, 'dict'):
doc_dict = doc_data.dict()
elif hasattr(doc_data, 'to_dict'):
doc_dict = doc_data.to_dict()
else:
doc_dict = doc_data
doc_dict["messageId"] = createdMessage["id"]
created_doc = self.createDocument(doc_dict)
if created_doc:
created_documents.append(created_doc)
# Convert to ChatMessage model
return ChatMessage(
id=createdMessage["id"],
workflowId=createdMessage["workflowId"],
parentMessageId=createdMessage.get("parentMessageId"),
agentName=createdMessage.get("agentName"),
documents=created_documents,
documentsLabel=createdMessage.get("documentsLabel"),
message=createdMessage.get("message"),
role=createdMessage.get("role", "assistant"),
status=createdMessage.get("status", "step"),
sequenceNr=len(workflow.messages) + 1, # Use messages list length for sequence number
publishedAt=createdMessage.get("publishedAt", get_utc_timestamp()),
stats=object_fields.get("stats"), # Use stats from object_fields
roundNumber=createdMessage.get("roundNumber"),
taskNumber=createdMessage.get("taskNumber"),
actionNumber=createdMessage.get("actionNumber"),
success=createdMessage.get("success"),
actionId=createdMessage.get("actionId"),
actionMethod=createdMessage.get("actionMethod"),
actionName=createdMessage.get("actionName")
)
except Exception as e:
logger.error(f"Error creating workflow message: {str(e)}")
return None
def updateMessage(self, messageId: str, messageData: Dict[str, Any]) -> Dict[str, Any]:
"""Updates a workflow message if user has access to the workflow."""
try:
logger.debug(f"Updating message {messageId} in database")
# Ensure messageId is provided
if not messageId:
logger.error("No messageId provided for updateMessage")
raise ValueError("messageId cannot be empty")
# Check if message exists in database
messages = self.db.getRecordset(ChatMessage, recordFilter={"id": messageId})
if not messages:
logger.warning(f"Message with ID {messageId} does not exist in database")
# If message doesn't exist but we have workflowId, create it
if "workflowId" in messageData:
workflowId = messageData.get("workflowId")
# Check workflow access
workflow = self.getWorkflow(workflowId)
if not workflow:
raise PermissionError(f"No access to workflow {workflowId}")
if not self._canModify(ChatWorkflow, workflowId):
raise PermissionError(f"No permission to modify workflow {workflowId}")
logger.info(f"Creating new message with ID {messageId} for workflow {workflowId}")
return self.db.recordCreate(ChatMessage, messageData)
else:
logger.error(f"Workflow ID missing for new message {messageId}")
return None
# Update existing message
existingMessage = messages[0]
# Check workflow access
workflowId = existingMessage.get("workflowId")
workflow = self.getWorkflow(workflowId)
if not workflow:
raise PermissionError(f"No access to workflow {workflowId}")
if not self._canModify(ChatWorkflow, workflowId):
raise PermissionError(f"No permission to modify workflow {workflowId}")
# Use generic field separation based on ChatMessage model
simple_fields, object_fields = self._separate_object_fields(ChatMessage, messageData)
# Ensure required fields present
for key in ["role", "agentName"]:
if key not in simple_fields and key not in existingMessage:
simple_fields[key] = "assistant" if key == "role" else ""
# Ensure ID is in the dataset
if 'id' not in simple_fields:
simple_fields['id'] = messageId
# Convert createdAt to startedAt if needed
if "createdAt" in simple_fields and "startedAt" not in simple_fields:
simple_fields["startedAt"] = simple_fields["createdAt"]
del simple_fields["createdAt"]
# Update the message with simple fields only
updatedMessage = self.db.recordModify(ChatMessage, messageId, simple_fields)
# Handle object field updates (documents, stats) inline
if 'documents' in object_fields:
documents_data = object_fields['documents']
try:
for doc_data in documents_data:
if hasattr(doc_data, 'dict'):
doc_dict = doc_data.dict()
elif hasattr(doc_data, 'to_dict'):
doc_dict = doc_data.to_dict()
else:
doc_dict = doc_data
doc_dict["messageId"] = messageId
self.createDocument(doc_dict)
logger.debug(f"Updated {len(documents_data)} documents for message {messageId}")
except Exception as e:
logger.error(f"Error updating message documents: {str(e)}")
if 'stats' in object_fields:
stats_data = object_fields['stats']
try:
if stats_data:
stats_data["messageId"] = messageId
self.db.recordCreate(ChatStat, stats_data)
logger.debug(f"Updated stats for message {messageId}")
except Exception as e:
logger.error(f"Error updating message stats: {str(e)}")
if updatedMessage:
logger.debug(f"Message {messageId} updated successfully")
else:
logger.warning(f"Failed to update message {messageId}")
return updatedMessage
except Exception as e:
logger.error(f"Error updating message {messageId}: {str(e)}", exc_info=True)
raise ValueError(f"Error updating message {messageId}: {str(e)}")
def deleteMessage(self, workflowId: str, messageId: str) -> bool:
"""Deletes a workflow message and all related data if user has access to the workflow."""
try:
# Check workflow access
workflow = self.getWorkflow(workflowId)
if not workflow:
logger.warning(f"No access to workflow {workflowId}")
return False
if not self._canModify(ChatWorkflow, workflowId):
raise PermissionError(f"No permission to modify workflow {workflowId}")
# Check if the message exists
messages = self.getMessages(workflowId)
message = next((m for m in messages if m.get("id") == messageId), None)
if not message:
logger.warning(f"Message {messageId} for workflow {workflowId} not found")
return False
# CASCADE DELETE: Delete all related data first
# 1. Delete message stats
existing_stats = self.db.getRecordset(ChatStat, recordFilter={"messageId": messageId})
for stat in existing_stats:
self.db.recordDelete(ChatStat, stat["id"])
# 2. Delete message documents (but NOT the files!)
existing_docs = self.db.getRecordset(ChatDocument, recordFilter={"messageId": messageId})
for doc in existing_docs:
self.db.recordDelete(ChatDocument, doc["id"])
# 3. Finally delete the message itself
success = self.db.recordDelete(ChatMessage, messageId)
logger.debug(f"Successfully deleted message {messageId} and all related data")
return success
except Exception as e:
logger.error(f"Error deleting message {messageId}: {str(e)}")
return False
def deleteFileFromMessage(self, workflowId: str, messageId: str, fileId: str) -> bool:
"""Removes a file reference from a message if user has access."""
try:
# Check workflow access
workflow = self.getWorkflow(workflowId)
if not workflow:
logger.warning(f"No access to workflow {workflowId}")
return False
if not self._canModify(ChatWorkflow, workflowId):
raise PermissionError(f"No permission to modify workflow {workflowId}")
logger.debug(f"Removing file {fileId} from message {messageId} in workflow {workflowId}")
# Get documents for this message from normalized table
documents = self.db.getRecordset(ChatDocument, recordFilter={"messageId": messageId})
if not documents:
logger.warning(f"No documents found for message {messageId}")
return False
# Find and delete the specific document
removed = False
for doc in documents:
docId = doc.get("id")
fileIdValue = doc.get("fileId")
# Flexible matching approach
shouldRemove = (
(docId == fileId) or
(fileIdValue == fileId) or
(isinstance(docId, str) and str(fileId) in docId) or
(isinstance(fileIdValue, str) and str(fileId) in fileIdValue)
)
if shouldRemove:
# Delete the document from normalized table
success = self.db.recordDelete(ChatDocument, docId)
if success:
removed = True
logger.debug(f"Successfully removed document {docId} (fileId: {fileIdValue})")
else:
logger.warning(f"Failed to delete document {docId}")
if not removed:
logger.warning(f"No matching file {fileId} found in message {messageId}")
return False
logger.debug(f"Successfully removed file {fileId} from message {messageId}")
return True
except Exception as e:
logger.error(f"Error removing file {fileId} from message {messageId}: {str(e)}")
return False
# Document methods
def getDocuments(self, messageId: str) -> List[ChatDocument]:
"""Returns documents for a message from normalized table."""
try:
documents = self.db.getRecordset(ChatDocument, recordFilter={"messageId": messageId})
return [ChatDocument(**doc) for doc in documents]
except Exception as e:
logger.error(f"Error getting message documents: {str(e)}")
return []
def createDocument(self, documentData: Dict[str, Any]) -> ChatDocument:
"""Creates a document for a message in normalized table."""
try:
# Validate document data
document = ChatDocument(**documentData)
# Create document in normalized table
created = self.db.recordCreate(ChatDocument, document)
return ChatDocument(**created)
except Exception as e:
logger.error(f"Error creating message document: {str(e)}")
return None
# Log methods
def getLogs(self, workflowId: str) -> List[ChatLog]:
"""Returns logs for a workflow if user has access to the workflow."""
# Check workflow access first (without calling getWorkflow to avoid circular reference)
workflows = self.db.getRecordset(ChatWorkflow, recordFilter={"id": workflowId})
if not workflows:
return []
filteredWorkflows = self._uam(ChatWorkflow, workflows)
if not filteredWorkflows:
return []
# Get logs for this workflow from normalized table
logs = self.db.getRecordset(ChatLog, recordFilter={"workflowId": workflowId})
# Sort logs by timestamp (Unix timestamps)
logs.sort(key=lambda x: float(x.get("timestamp", 0)))
return [ChatLog(**log) for log in logs]
def createLog(self, logData: Dict[str, Any]) -> ChatLog:
"""Creates a log entry for a workflow if user has access."""
# Check workflow access
workflowId = logData.get("workflowId")
if not workflowId:
logger.error("No workflowId provided for createLog")
return None
workflow = self.getWorkflow(workflowId)
if not workflow:
logger.warning(f"No access to workflow {workflowId}")
return None
if not self._canModify(ChatWorkflow, workflowId):
logger.warning(f"No permission to modify workflow {workflowId}")
return None
# Make sure required fields are present
if "timestamp" not in logData:
logData["timestamp"] = get_utc_timestamp()
# Add status information if not present
if "status" not in logData and "type" in logData:
if logData["type"] == "error":
logData["status"] = "error"
else:
logData["status"] = "running"
# Add progress information if not present
if "progress" not in logData:
# Default progress values based on log type
if logData.get("type") == "info":
logData["progress"] = 50 # Default middle progress
elif logData.get("type") == "error":
logData["progress"] = -1 # Error state
elif logData.get("type") == "warning":
logData["progress"] = 50 # Default middle progress
# Validate log data against ChatLog model
try:
log_model = ChatLog(**logData)
except Exception as e:
logger.error(f"Invalid log data: {str(e)}")
return None
# Create log in normalized table
createdLog = self.db.recordCreate(ChatLog, log_model)
# Return validated ChatLog instance
return ChatLog(**createdLog)
# Stats methods
def getMessageStats(self, messageId: str) -> Optional[ChatStat]:
"""Returns statistics for a message from normalized table."""
try:
stats = self.db.getRecordset(ChatStat, recordFilter={"messageId": messageId})
if not stats:
return None
# Return the most recent stats record
stats.sort(key=lambda x: x.get("created_at", ""), reverse=True)
return ChatStat(**stats[0])
except Exception as e:
logger.error(f"Error getting message stats: {str(e)}")
return None
def getWorkflowStats(self, workflowId: str) -> Optional[ChatStat]:
"""Returns statistics for a workflow if user has access."""
# Check workflow access first (without calling getWorkflow to avoid circular reference)
workflows = self.db.getRecordset(ChatWorkflow, recordFilter={"id": workflowId})
if not workflows:
return None
filteredWorkflows = self._uam(ChatWorkflow, workflows)
if not filteredWorkflows:
return None
# Get stats for this workflow from normalized table
stats = self.db.getRecordset(ChatStat, recordFilter={"workflowId": workflowId})
if not stats:
return None
# Return the most recent stats record
stats.sort(key=lambda x: x.get("created_at", ""), reverse=True)
return ChatStat(**stats[0])
def updateWorkflowStats(self, workflowId: str, bytesSent: int = 0, bytesReceived: int = 0) -> bool:
"""Updates workflow statistics during execution with incremental values."""
try:
# Get current workflow
workflow = self.getWorkflow(workflowId)
if not workflow:
logger.error(f"Workflow {workflowId} not found for stats update")
return False
if not self._canModify(ChatWorkflow, workflowId):
logger.error(f"No permission to update workflow {workflowId} stats")
return False
# Get current stats from normalized table
currentStats = self.getWorkflowStats(workflowId)
if currentStats:
current_bytes_sent = currentStats.bytesSent or 0
current_bytes_received = currentStats.bytesReceived or 0
current_processing_time = currentStats.processingTime or 0
else:
current_bytes_sent = 0
current_bytes_received = 0
current_processing_time = 0
# Calculate processing time as duration since workflow start
if workflow and workflow.startedAt:
try:
start_time = int(float(workflow.startedAt))
current_time = int(get_utc_timestamp())
processing_time = current_time - start_time
# Ensure processing time is reasonable
if processing_time < 0:
processing_time = 0
elif processing_time > 86400 * 365: # More than 1 year
processing_time = 0
except Exception as e:
logger.warning(f"Error calculating processing time: {str(e)}")
processing_time = current_processing_time
else:
processing_time = current_processing_time
# Update stats with incremental values
new_bytes_sent = current_bytes_sent + bytesSent
new_bytes_received = current_bytes_received + bytesReceived
new_token_count = new_bytes_sent + new_bytes_received
# Create or update stats record in normalized table
stats_record = {
"workflowId": workflowId,
"processingTime": processing_time,
"tokenCount": new_token_count,
"bytesSent": new_bytes_sent,
"bytesReceived": new_bytes_received,
"successRate": None,
"errorCount": None
}
# Create new stats record
self.db.recordCreate(ChatStat, stats_record)
return True
except Exception as e:
logger.error(f"Error updating workflow stats: {str(e)}")
return False
# Workflow Actions
async def workflowStart(self, currentUser: User, userInput: UserInputRequest, workflowId: Optional[str] = None) -> ChatWorkflow:
"""
Starts a new workflow or continues an existing one.
Args:
userInput: The user input request containing workflow initialization data
workflowId: Optional ID of an existing workflow to continue
Returns:
ChatWorkflow object representing the started/continued workflow
"""
try:
# Get current timestamp
currentTime = get_utc_timestamp()
if workflowId:
# Continue existing workflow - load complete state including messages
workflow = self.getWorkflow(workflowId)
if not workflow:
raise ValueError(f"Workflow {workflowId} not found")
# Check if workflow is currently running and stop it first
if workflow.status == "running":
logger.info(f"Stopping running workflow {workflowId} before processing new prompt")
# Stop the running workflow
workflow.status = "stopped"
workflow.lastActivity = currentTime
self.updateWorkflow(workflowId, {
"status": "stopped",
"lastActivity": currentTime
})
# Add log entry for workflow stop
self.createLog({
"workflowId": workflowId,
"message": "Workflow stopped for new prompt",
"type": "info",
"status": "stopped",
"progress": 100
})
# Wait a moment for any running processes to detect the stop
await asyncio.sleep(0.1)
# Update workflow - increment round for existing workflows
newRound = workflow.currentRound + 1
self.updateWorkflow(workflowId, {
"status": "running", # Set status back to running for resumed workflows
"lastActivity": currentTime,
"currentRound": newRound
})
# Reload workflow object to get updated currentRound from database
workflow = self.getWorkflow(workflowId)
if not workflow:
raise ValueError(f"Failed to reload workflow {workflowId} after update")
# Add log entry for workflow resumption
self.createLog({
"workflowId": workflowId,
"message": f"Workflow resumed (round {workflow.currentRound})",
"type": "info",
"status": "running",
"progress": 0
})
else:
# Create new workflow
workflowData = {
"name": "New Workflow", # Default name since UserInputRequest doesn't have a name field
"status": "running",
"startedAt": currentTime,
"lastActivity": currentTime,
"currentRound": 0, # Default value, will be set to 1 in workflowStart()
"currentTask": 0,
"currentAction": 0,
"totalTasks": 0,
"totalActions": 0,
"mandateId": self.mandateId,
"messageIds": [],
"stats": {
"processingTime": None,
"tokenCount": None,
"bytesSent": None,
"bytesReceived": None,
"successRate": None,
"errorCount": None
}
}
# Create workflow
workflow = self.createWorkflow(workflowData)
# Set currentRound to 1 for new workflows
workflow.currentRound = 1
self.updateWorkflow(workflow.id, {"currentRound": 1})
# Initialize stats for the new workflow
self.updateWorkflowStats(workflow.id, bytesSent=0, bytesReceived=0)
# Remove the 'Workflow started' log entry
# Start workflow processing
from modules.workflow.managerWorkflow import WorkflowManager
workflowManager = WorkflowManager(self, currentUser)
# Start the workflow processing asynchronously
# The workflow will be updated with progress data during execution
asyncio.create_task(workflowManager.workflowProcess(userInput, workflow))
return workflow
except Exception as e:
logger.error(f"Error starting workflow: {str(e)}")
raise
async def workflowStop(self, workflowId: str) -> ChatWorkflow:
"""
Stops a running workflow (State 8: Workflow Stopped).
Args:
workflowId: ID of the workflow to stop
Returns:
Updated ChatWorkflow object
"""
try:
# Load workflow state
workflow = self.getWorkflow(workflowId)
if not workflow:
raise ValueError(f"Workflow {workflowId} not found")
# Update workflow status
workflow.status = "stopped"
workflow.lastActivity = get_utc_timestamp()
# Update in database
self.updateWorkflow(workflowId, {
"status": "stopped",
"lastActivity": workflow.lastActivity
})
# Add log entry
self.createLog({
"workflowId": workflowId,
"message": "Workflow stopped",
"type": "warning",
"status": "stopped",
"progress": 100
})
return workflow
except Exception as e:
logger.error(f"Error stopping workflow: {str(e)}")
raise
def getInterface(currentUser: Optional[User] = None) -> 'ChatObjects':
"""
Returns a ChatObjects instance for the current user.
Handles initialization of database and records.
"""
if not currentUser:
raise ValueError("Invalid user context: user is required")
# Create context key
contextKey = f"{currentUser.mandateId}_{currentUser.id}"
# Create new instance if not exists
if contextKey not in _chatInterfaces:
_chatInterfaces[contextKey] = ChatObjects(currentUser)
return _chatInterfaces[contextKey]