""" Interface to LucyDOM database and AI Connectors. Uses the JSON connector for data access with added language support. """ import os import logging import uuid import time from datetime import datetime, UTC, timezone from typing import Dict, Any, List, Optional, Union import asyncio from modules.interfaces.interfaceChatAccess import ChatAccess from modules.interfaces.interfaceChatModel import ( TaskStatus, UserInputRequest, ChatDocument, TaskItem, ChatStat, ChatLog, ChatMessage, ChatWorkflow, TaskAction, TaskResult, ActionResult ) from modules.interfaces.interfaceAppModel import User # DYNAMIC PART: Connectors to the Interface from modules.connectors.connectorDbJson import DatabaseConnector # Basic Configurations from modules.shared.configuration import APP_CONFIG logger = logging.getLogger(__name__) # Singleton factory for Chat instances _chatInterfaces = {} class ChatObjects: """ Interface to Chat database and AI Connectors. Uses the JSON connector for data access with added language support. """ def __init__(self, currentUser: Optional[User] = None): """Initializes the Chat Interface.""" # Initialize variables self.currentUser = currentUser # Store User object directly self.userId = currentUser.id if currentUser else None self.mandateId = currentUser.mandateId if currentUser else None self.access = None # Will be set when user context is provided # Initialize services self._initializeServices() # Initialize database self._initializeDatabase() # Set user context if provided if currentUser: self.setUserContext(currentUser) def _initializeServices(self): pass def setUserContext(self, currentUser: User): """Sets the user context for the interface.""" self.currentUser = currentUser # Store User object directly self.userId = currentUser.id self.mandateId = currentUser.mandateId if not self.userId or not self.mandateId: raise ValueError("Invalid user context: id and mandateId are required") # Add language settings self.userLanguage = currentUser.language # Default user language # Initialize access control with user context self.access = ChatAccess(self.currentUser, self.db) # Convert to dict only when needed # Update database context self.db.updateContext(self.userId) logger.debug(f"User context set: userId={self.userId}, mandateId={self.mandateId}") def _initializeDatabase(self): """Initializes the database connection.""" try: # Get configuration values with defaults dbHost = APP_CONFIG.get("DB_CHAT_HOST", "_no_config_default_data") dbDatabase = APP_CONFIG.get("DB_CHAT_DATABASE", "chat") dbUser = APP_CONFIG.get("DB_CHAT_USER") dbPassword = APP_CONFIG.get("DB_CHAT_PASSWORD_SECRET") # Ensure the database directory exists os.makedirs(dbHost, exist_ok=True) self.db = DatabaseConnector( dbHost=dbHost, dbDatabase=dbDatabase, dbUser=dbUser, dbPassword=dbPassword ) logger.info("Database initialized successfully") except Exception as e: logger.error(f"Failed to initialize database: {str(e)}") raise def _initRecords(self): """Initializes standard records in the database if they don't exist.""" pass def _uam(self, table: str, recordset: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Delegate to access control module.""" # First apply access control filteredRecords = self.access.uam(table, recordset) # Then filter out database-specific fields cleanedRecords = [] for record in filteredRecords: # Create a new dict with only non-database fields cleanedRecord = {k: v for k, v in record.items() if not k.startswith('_')} cleanedRecords.append(cleanedRecord) return cleanedRecords def _canModify(self, table: str, recordId: Optional[str] = None) -> bool: """Delegate to access control module.""" return self.access.canModify(table, recordId) # Utilities def getInitialId(self, table: str) -> Optional[str]: """Returns the initial ID for a table.""" return self.db.getInitialId(table) def _getCurrentTimestamp(self) -> str: """Returns the current timestamp as Unix timestamp (seconds since epoch)""" return str(int(time.time())) # Workflow methods def getAllWorkflows(self) -> List[Dict[str, Any]]: """Returns workflows based on user access level.""" allWorkflows = self.db.getRecordset("workflows") return self._uam("workflows", allWorkflows) def getWorkflowsByUser(self, userId: str) -> List[Dict[str, Any]]: """Returns workflows for a specific user if current user has access.""" # Get workflows by userId workflows = self.db.getRecordset("workflows", recordFilter={"_createdBy": userId}) # Apply access control return self._uam("workflows", workflows) def getWorkflow(self, workflowId: str) -> Optional[ChatWorkflow]: """Returns a workflow by ID if user has access.""" workflows = self.db.getRecordset("workflows", recordFilter={"id": workflowId}) if not workflows: return None filteredWorkflows = self._uam("workflows", workflows) if not filteredWorkflows: return None workflow = filteredWorkflows[0] try: # Validate workflow data against ChatWorkflow model return ChatWorkflow( id=workflow["id"], status=workflow.get("status", "running"), name=workflow.get("name"), currentRound=workflow.get("currentRound", 1), lastActivity=workflow.get("lastActivity", self._getCurrentTimestamp()), startedAt=workflow.get("startedAt", self._getCurrentTimestamp()), logs=[ChatLog(**log) for log in workflow.get("logs", [])], messages=[ChatMessage(**msg) for msg in workflow.get("messages", [])], stats=ChatStat(**workflow.get("dataStats", {})) if workflow.get("dataStats") else ChatStat( bytesSent=0, bytesReceived=0, tokenCount=0, processingTime=0 ), mandateId=workflow.get("mandateId", self.currentUser.mandateId) ) except Exception as e: logger.error(f"Error validating workflow data: {str(e)}") return None def createWorkflow(self, workflowData: Dict[str, Any]) -> ChatWorkflow: """Creates a new workflow if user has permission.""" if not self._canModify("workflows"): raise PermissionError("No permission to create workflows") # Set timestamp if not present currentTime = self._getCurrentTimestamp() if "startedAt" not in workflowData: workflowData["startedAt"] = currentTime if "lastActivity" not in workflowData: workflowData["lastActivity"] = currentTime # Create workflow in database created = self.db.recordCreate("workflows", workflowData) # Convert to ChatWorkflow model return ChatWorkflow( id=created["id"], status=created.get("status", "running"), name=created.get("name"), currentRound=created.get("currentRound", 1), lastActivity=created.get("lastActivity", currentTime), startedAt=created.get("startedAt", currentTime), logs=[], messages=[], stats=ChatStat(**created.get("dataStats", {})) if created.get("dataStats") else None, mandateId=created.get("mandateId", self.currentUser.mandateId) ) def updateWorkflow(self, workflowId: str, workflowData: Dict[str, Any]) -> ChatWorkflow: """Updates a workflow if user has access.""" # Check if the workflow exists and user has access workflow = self.getWorkflow(workflowId) if not workflow: return None if not self._canModify("workflows", workflowId): raise PermissionError(f"No permission to update workflow {workflowId}") # Set update time workflowData["lastActivity"] = self._getCurrentTimestamp() # Update workflow in database updated = self.db.recordModify("workflows", workflowId, workflowData) # Convert to ChatWorkflow model return ChatWorkflow( id=updated["id"], status=updated.get("status", workflow.status), name=updated.get("name", workflow.name), currentRound=updated.get("currentRound", workflow.currentRound), lastActivity=updated.get("lastActivity", workflow.lastActivity), startedAt=updated.get("startedAt", workflow.startedAt), logs=[ChatLog(**log) for log in updated.get("logs", workflow.logs)], messages=[ChatMessage(**msg) for msg in updated.get("messages", workflow.messages)], stats=ChatStat(**updated.get("dataStats", workflow.stats.dict() if workflow.stats else {})) if updated.get("dataStats") or workflow.stats else ChatStat( bytesSent=0, bytesReceived=0, tokenCount=0, processingTime=0 ), mandateId=updated.get("mandateId", workflow.mandateId) ) def deleteWorkflow(self, workflowId: str) -> bool: """Deletes a workflow if user has access.""" # Check if the workflow exists and user has access workflow = self.getWorkflow(workflowId) if not workflow: return False if not self._canModify("workflows", workflowId): raise PermissionError(f"No permission to delete workflow {workflowId}") # Delete workflow return self.db.recordDelete("workflows", workflowId) # Workflow Messages def getWorkflowMessages(self, workflowId: str) -> List[ChatMessage]: """Returns messages for a workflow if user has access to the workflow.""" # Check workflow access first workflow = self.getWorkflow(workflowId) if not workflow: return [] # Get messages for this workflow messages = self.db.getRecordset("workflowMessages", recordFilter={"workflowId": workflowId}) # Sort messages by publishedAt timestamp to ensure chronological order messages.sort(key=lambda x: x.get("publishedAt", x.get("timestamp", "0"))) return [ChatMessage(**msg) for msg in messages] def createWorkflowMessage(self, messageData: Dict[str, Any]) -> ChatMessage: """Creates a message for a workflow if user has access.""" try: # Ensure ID is present if "id" not in messageData or not messageData["id"]: messageData["id"] = f"msg_{uuid.uuid4()}" # Check required fields requiredFields = ["id", "workflowId"] for field in requiredFields: if field not in messageData: logger.error(f"Required field '{field}' missing in messageData") raise ValueError(f"Required field '{field}' missing in message data") # Check workflow access workflowId = messageData["workflowId"] workflow = self.getWorkflow(workflowId) if not workflow: raise PermissionError(f"No access to workflow {workflowId}") if not self._canModify("workflows", workflowId): raise PermissionError(f"No permission to modify workflow {workflowId}") # Validate that ID is not None if messageData["id"] is None: messageData["id"] = f"msg_{uuid.uuid4()}" logger.warning(f"Automatically generated ID for workflow message: {messageData['id']}") # Set status if not present if "status" not in messageData: messageData["status"] = "step" # Default status for intermediate messages # Ensure role and agentName are present if "role" not in messageData: messageData["role"] = "assistant" if messageData.get("agentName") else "user" if "agentName" not in messageData: messageData["agentName"] = "" # Convert ChatDocument objects to dictionaries for database storage if "documents" in messageData and messageData["documents"]: documents_for_db = [] for doc in messageData["documents"]: if isinstance(doc, ChatDocument): # Convert ChatDocument to dictionary documents_for_db.append(doc.dict()) else: # Already a dictionary documents_for_db.append(doc) messageData["documents"] = documents_for_db # Create message in database createdMessage = self.db.recordCreate("workflowMessages", messageData) # Convert to ChatMessage model return ChatMessage( id=createdMessage["id"], workflowId=createdMessage["workflowId"], parentMessageId=createdMessage.get("parentMessageId"), agentName=createdMessage.get("agentName"), documents=[ChatDocument(**doc) for doc in createdMessage.get("documents", [])], documentsLabel=createdMessage.get("documentsLabel"), # <-- FIX: ensure label is set message=createdMessage.get("message"), role=createdMessage.get("role", "assistant"), status=createdMessage.get("status", "step"), sequenceNr=len(workflow.messages) + 1, # Use messages list length for sequence number publishedAt=createdMessage.get("publishedAt", self._getCurrentTimestamp()), stats=ChatStat(**createdMessage.get("stats", {})) if createdMessage.get("stats") else None ) except Exception as e: logger.error(f"Error creating workflow message: {str(e)}") return None def updateWorkflowMessage(self, messageId: str, messageData: Dict[str, Any]) -> Dict[str, Any]: """Updates a workflow message if user has access to the workflow.""" try: logger.debug(f"Updating message {messageId} in database") # Ensure messageId is provided if not messageId: logger.error("No messageId provided for updateWorkflowMessage") raise ValueError("messageId cannot be empty") # Check if message exists in database messages = self.db.getRecordset("workflowMessages", recordFilter={"id": messageId}) if not messages: logger.warning(f"Message with ID {messageId} does not exist in database") # If message doesn't exist but we have workflowId, create it if "workflowId" in messageData: workflowId = messageData.get("workflowId") # Check workflow access workflow = self.getWorkflow(workflowId) if not workflow: raise PermissionError(f"No access to workflow {workflowId}") if not self._canModify("workflows", workflowId): raise PermissionError(f"No permission to modify workflow {workflowId}") logger.info(f"Creating new message with ID {messageId} for workflow {workflowId}") return self.db.recordCreate("workflowMessages", messageData) else: logger.error(f"Workflow ID missing for new message {messageId}") return None # Update existing message existingMessage = messages[0] # Check workflow access workflowId = existingMessage.get("workflowId") workflow = self.getWorkflow(workflowId) if not workflow: raise PermissionError(f"No access to workflow {workflowId}") if not self._canModify("workflows", workflowId): raise PermissionError(f"No permission to modify workflow {workflowId}") # Ensure required fields present for key in ["role", "agentName"]: if key not in messageData and key not in existingMessage: messageData[key] = "assistant" if key == "role" else "" # Ensure ID is in the dataset if 'id' not in messageData: messageData['id'] = messageId # Convert createdAt to startedAt if needed if "createdAt" in messageData and "startedAt" not in messageData: messageData["startedAt"] = messageData["createdAt"] del messageData["createdAt"] # Update the message updatedMessage = self.db.recordModify("workflowMessages", messageId, messageData) if updatedMessage: logger.debug(f"Message {messageId} updated successfully") else: logger.warning(f"Failed to update message {messageId}") return updatedMessage except Exception as e: logger.error(f"Error updating message {messageId}: {str(e)}", exc_info=True) raise ValueError(f"Error updating message {messageId}: {str(e)}") def deleteWorkflowMessage(self, workflowId: str, messageId: str) -> bool: """Deletes a workflow message if user has access to the workflow.""" try: # Check workflow access workflow = self.getWorkflow(workflowId) if not workflow: logger.warning(f"No access to workflow {workflowId}") return False if not self._canModify("workflows", workflowId): raise PermissionError(f"No permission to modify workflow {workflowId}") # Check if the message exists messages = self.getWorkflowMessages(workflowId) message = next((m for m in messages if m.get("id") == messageId), None) if not message: logger.warning(f"Message {messageId} for workflow {workflowId} not found") return False # Delete the message from the database return self.db.recordDelete("workflowMessages", messageId) except Exception as e: logger.error(f"Error deleting message {messageId}: {str(e)}") return False def deleteFileFromMessage(self, workflowId: str, messageId: str, fileId: str) -> bool: """Removes a file reference from a message if user has access.""" try: # Check workflow access workflow = self.getWorkflow(workflowId) if not workflow: logger.warning(f"No access to workflow {workflowId}") return False if not self._canModify("workflows", workflowId): raise PermissionError(f"No permission to modify workflow {workflowId}") logger.debug(f"Removing file {fileId} from message {messageId} in workflow {workflowId}") # Get all workflow messages allMessages = self.getWorkflowMessages(workflowId) logger.debug(f"Workflow {workflowId} has {len(allMessages)} messages") # Try different approaches to find the message message = None # Exact match message = next((m for m in allMessages if m.get("id") == messageId), None) # Case-insensitive match if not message and isinstance(messageId, str): message = next((m for m in allMessages if isinstance(m.get("id"), str) and m.get("id").lower() == messageId.lower()), None) # Partial match (starts with) if not message and isinstance(messageId, str): message = next((m for m in allMessages if isinstance(m.get("id"), str) and m.get("id").startswith(messageId)), None) if not message: logger.warning(f"Message {messageId} not found in workflow {workflowId}") return False # Log the found message logger.debug(f"Found message: {message.get('id')}") # Check if message has documents if "documents" not in message or not message["documents"]: logger.warning(f"No documents in message {messageId}") return False # Log existing documents documents = message.get("documents", []) logger.debug(f"Message has {len(documents)} documents") # Create a new list of documents without the one to delete updatedDocuments = [] removed = False for doc in documents: docId = doc.get("id") fileIdValue = doc.get("fileId") # Flexible matching approach shouldRemove = ( (docId == fileId) or (fileIdValue == fileId) or (isinstance(docId, str) and str(fileId) in docId) or (isinstance(fileIdValue, str) and str(fileId) in fileIdValue) ) if shouldRemove: removed = True logger.debug(f"Found file to remove: docId={docId}, fileId={fileIdValue}") else: updatedDocuments.append(doc) if not removed: logger.warning(f"No matching file {fileId} found in message {messageId}") return False # Update message with modified documents array messageUpdate = { "documents": updatedDocuments } # Apply the update directly to the database updated = self.db.recordModify("workflowMessages", message["id"], messageUpdate) if updated: logger.debug(f"Successfully removed file {fileId} from message {messageId}") return True else: logger.warning(f"Failed to update message {messageId} in database") return False except Exception as e: logger.error(f"Error removing file {fileId} from message {messageId}: {str(e)}") return False # Workflow Logs def getWorkflowLogs(self, workflowId: str) -> List[ChatLog]: """Returns logs for a workflow if user has access to the workflow.""" # Check workflow access first workflow = self.getWorkflow(workflowId) if not workflow: return [] # Get logs for this workflow logs = self.db.getRecordset("workflowLogs", recordFilter={"workflowId": workflowId}) # Sort logs by timestamp (Unix timestamps) logs.sort(key=lambda x: float(x.get("timestamp", 0))) return [ChatLog(**log) for log in logs] def updateWorkflowStats(self, workflowId: str, bytesSent: int = 0, bytesReceived: int = 0) -> bool: """Updates workflow statistics during execution with incremental values.""" try: # Get current workflow workflow = self.getWorkflow(workflowId) if not workflow: logger.error(f"Workflow {workflowId} not found for stats update") return False if not self._canModify("workflows", workflowId): logger.error(f"No permission to update workflow {workflowId} stats") return False # Get current stats - ensure we have proper defaults if workflow.stats: currentStats = workflow.stats.dict() # Ensure all required fields exist currentStats.setdefault("bytesSent", 0) currentStats.setdefault("bytesReceived", 0) currentStats.setdefault("tokenCount", 0) currentStats.setdefault("processingTime", 0) else: currentStats = { "bytesSent": 0, "bytesReceived": 0, "tokenCount": 0, "processingTime": 0 } # Calculate processing time as duration since workflow start using Unix timestamps workflow = self.getWorkflow(workflowId) if workflow and workflow.startedAt: try: # Parse start time as Unix timestamp (handle both old ISO format and new Unix format) start_time_str = workflow.startedAt try: # Try to parse as Unix timestamp first start_time = int(float(start_time_str)) except ValueError: # If that fails, try to parse as ISO format and convert to Unix try: # Handle ISO format timestamps (for backward compatibility) if start_time_str.endswith('Z'): start_time_str = start_time_str.replace('Z', '+00:00') dt = datetime.fromisoformat(start_time_str) start_time = int(dt.timestamp()) except: # If all parsing fails, use current time logger.warning(f"Could not parse start time: {start_time_str}, using current time") start_time = int(time.time()) current_time = int(time.time()) processing_time = current_time - start_time # Ensure processing time is reasonable (not negative or extremely large) if processing_time < 0: logger.warning(f"Negative processing time calculated: {processing_time}, using 0") processing_time = 0 elif processing_time > 86400 * 365: # More than 1 year logger.warning(f"Unreasonably large processing time: {processing_time}, using 0") processing_time = 0 except Exception as e: logger.warning(f"Error calculating processing time: {str(e)}") processing_time = currentStats.get("processingTime", 0) or 0 else: # Fallback to existing processing time or 0 processing_time = currentStats.get("processingTime", 0) or 0 # Update stats with incremental values - ensure no None values current_bytes_sent = currentStats.get("bytesSent", 0) or 0 current_bytes_received = currentStats.get("bytesReceived", 0) or 0 currentStats["bytesSent"] = current_bytes_sent + bytesSent currentStats["bytesReceived"] = current_bytes_received + bytesReceived currentStats["tokenCount"] = currentStats["bytesSent"] + currentStats["bytesReceived"] currentStats["processingTime"] = processing_time # Update workflow in database self.db.recordModify("workflows", workflowId, { "dataStats": currentStats }) # Log to stats table stats_record = { "timestamp": self._getCurrentTimestamp(), "workflowId": workflowId, "bytesSent": bytesSent, "bytesReceived": bytesReceived, "tokenCount": bytesSent + bytesReceived, "processingTime": processing_time } # Create stats record in database self.db.recordCreate("stats", stats_record) # logger.debug(f"Updated workflow {workflowId} stats: {currentStats}") # logger.debug(f"Logged stats record: {stats_record}") return True except Exception as e: logger.error(f"Error updating workflow stats: {str(e)}") return False def createWorkflowLog(self, logData: Dict[str, Any]) -> ChatLog: """Creates a log entry for a workflow if user has access.""" # Check workflow access workflowId = logData.get("workflowId") if not workflowId: logger.error("No workflowId provided for createWorkflowLog") return None workflow = self.getWorkflow(workflowId) if not workflow: logger.warning(f"No access to workflow {workflowId}") return None if not self._canModify("workflows", workflowId): logger.warning(f"No permission to modify workflow {workflowId}") return None # Make sure required fields are present if "timestamp" not in logData: logData["timestamp"] = self._getCurrentTimestamp() # Add status information if not present if "status" not in logData and "type" in logData: if logData["type"] == "error": logData["status"] = "error" else: logData["status"] = "running" # Add progress information if not present if "progress" not in logData: # Default progress values based on log type if logData.get("type") == "info": logData["progress"] = 50 # Default middle progress elif logData.get("type") == "error": logData["progress"] = -1 # Error state elif logData.get("type") == "warning": logData["progress"] = 50 # Default middle progress # Validate log data against ChatLog model try: log_model = ChatLog(**logData) except Exception as e: logger.error(f"Invalid log data: {str(e)}") return None # Create log in database createdLog = self.db.recordCreate("workflowLogs", log_model.to_dict()) # Return validated ChatLog instance return ChatLog(**createdLog) # Workflow Management def saveWorkflowState(self, workflow: ChatWorkflow, saveMessages: bool = True, saveLogs: bool = True) -> bool: """Saves workflow state if user has access.""" try: workflowId = workflow.id if not workflowId: return False # Check workflow access existingWorkflow = self.getWorkflow(workflowId) if not existingWorkflow and not self._canModify("workflows"): logger.warning(f"No permission to create workflow {workflowId}") return False if existingWorkflow and not self._canModify("workflows", workflowId): logger.warning(f"No permission to update workflow {workflowId}") return False # Extract only the database-relevant workflow fields workflowDbData = { "id": workflowId, "mandateId": workflow.mandateId, "name": workflow.name, "status": workflow.status, "startedAt": workflow.startedAt, "lastActivity": workflow.lastActivity, "dataStats": workflow.stats.dict() if workflow.stats else {} } # Check if workflow already exists if existingWorkflow: self.updateWorkflow(workflowId, workflowDbData) else: self.createWorkflow(workflowDbData) # Save messages if saveMessages and "messages" in workflow: for message in workflow["messages"]: messageId = message.get("id") if not messageId: continue # Get existing message from database existingMessages = self.getWorkflowMessages(workflowId) existingMessage = next((m for m in existingMessages if m.get("id") == messageId), None) if existingMessage: # Check if updates are needed hasChanges = False for key in ["role", "agentName", "content", "status", "documents"]: if key in message and message.get(key) != existingMessage.get(key): hasChanges = True break if hasChanges: # Extract only relevant data for the database messageData = { "role": message.get("role", existingMessage.get("role", "unknown")), "content": message.get("content", existingMessage.get("content", "")), "agentName": message.get("agentName", existingMessage.get("agentName", "")), "status": message.get("status", existingMessage.get("status", "completed")), "documents": message.get("documents", existingMessage.get("documents", [])) } self.updateWorkflowMessage(messageId, messageData) else: # Message doesn't exist in database yet logger.warning(f"Message {messageId} in workflow {workflowId} not found in database") # Save logs if saveLogs and "logs" in workflow: # Get existing logs existingLogs = {log["id"]: log for log in self.getWorkflowLogs(workflowId)} for log in workflow["logs"]: logId = log.get("id") if not logId: continue # Extract only relevant data for the database logData = { "id": logId, "workflowId": workflowId, "message": log.get("message", ""), "type": log.get("type", "info"), "timestamp": log.get("timestamp", self._getCurrentTimestamp()), "agentName": log.get("agentName", "(undefined)"), "status": log.get("status", "running"), "progress": log.get("progress", 50) } # Create or update log if logId in existingLogs: self.db.recordModify("workflowLogs", logId, logData) else: self.db.recordCreate("workflowLogs", logData) return True except Exception as e: logger.error(f"Error saving workflow state: {str(e)}") return False def loadWorkflowState(self, workflowId: str) -> Optional[ChatWorkflow]: """Loads workflow state if user has access.""" try: # Check workflow access workflow = self.getWorkflow(workflowId) if not workflow: return None logger.debug(f"Loaded base workflow {workflowId} from database") # Load messages messages = self.getWorkflowMessages(workflowId) # Messages are already sorted by publishedAt in getWorkflowMessages messageCount = len(messages) logger.debug(f"Loaded {messageCount} messages for workflow {workflowId}") # Log document counts for each message for msg in messages: docCount = len(msg.documents) if hasattr(msg, 'documents') else 0 if docCount > 0: logger.debug(f"Message {msg.id} has {docCount} documents loaded from database") # Load logs logs = self.getWorkflowLogs(workflowId) # Logs are already sorted by timestamp in getWorkflowLogs # Create a new ChatWorkflow object with loaded messages and logs return ChatWorkflow( id=workflow.id, status=workflow.status, name=workflow.name, currentRound=workflow.currentRound, lastActivity=workflow.lastActivity, startedAt=workflow.startedAt, logs=logs, messages=messages, stats=workflow.stats, mandateId=workflow.mandateId ) except Exception as e: logger.error(f"Error loading workflow state: {str(e)}") return None # Workflow Actions async def workflowStart(self, currentUser: User, userInput: UserInputRequest, workflowId: Optional[str] = None) -> ChatWorkflow: """ Starts a new workflow or continues an existing one. Args: userInput: The user input request containing workflow initialization data workflowId: Optional ID of an existing workflow to continue Returns: ChatWorkflow object representing the started/continued workflow """ try: # Get current timestamp currentTime = self._getCurrentTimestamp() if workflowId: # Continue existing workflow - load complete state including messages workflow = self.loadWorkflowState(workflowId) if not workflow: raise ValueError(f"Workflow {workflowId} not found") # Update workflow - set status back to running for resumed workflows self.updateWorkflow(workflowId, { "status": "running", # Set status back to running for resumed workflows "lastActivity": currentTime, "currentRound": workflow.currentRound + 1 }) # Update the workflow object status as well workflow.status = "running" # Add log entry for workflow resumption self.createWorkflowLog({ "workflowId": workflowId, "message": f"Workflow resumed (round {workflow.currentRound + 1})", "type": "info", "status": "running", "progress": 0 }) else: # Create new workflow workflowData = { "name": "New Workflow", # Default name since UserInputRequest doesn't have a name field "status": "running", "startedAt": currentTime, "lastActivity": currentTime, "currentRound": 1, "mandateId": self.mandateId, "messageIds": [], "dataStats": { "totalMessages": 0, "totalDocuments": 0, "totalTokens": 0 } } # Create workflow workflow = self.createWorkflow(workflowData) # Initialize stats for the new workflow self.updateWorkflowStats(workflow.id, bytesSent=0, bytesReceived=0) # Remove the 'Workflow started' log entry # Start workflow processing from modules.workflow.managerWorkflow import WorkflowManager workflowManager = WorkflowManager(self, currentUser) asyncio.create_task(workflowManager.workflowProcess(userInput, workflow)) return workflow except Exception as e: logger.error(f"Error starting workflow: {str(e)}") raise async def workflowStop(self, workflowId: str) -> ChatWorkflow: """ Stops a running workflow (State 8: Workflow Stopped). Args: workflowId: ID of the workflow to stop Returns: Updated ChatWorkflow object """ try: # Load workflow state workflow = self.getWorkflow(workflowId) if not workflow: raise ValueError(f"Workflow {workflowId} not found") # Update workflow status workflow.status = "stopped" workflow.lastActivity = self._getCurrentTimestamp() # Update in database self.updateWorkflow(workflowId, { "status": "stopped", "lastActivity": workflow.lastActivity }) # Add log entry self.createWorkflowLog({ "workflowId": workflowId, "message": "Workflow stopped", "type": "warning", "status": "stopped", "progress": 100 }) return workflow except Exception as e: logger.error(f"Error stopping workflow: {str(e)}") raise # Task Management def getTask(self, taskId: str) -> Optional[TaskItem]: """Returns a task by ID if user has access.""" tasks = self.db.getRecordset("tasks", recordFilter={"id": taskId}) if not tasks: return None filteredTasks = self._uam("tasks", tasks) if not filteredTasks: return None task = filteredTasks[0] try: # Validate task data against TaskItem model return TaskItem( id=task["id"], workflowId=task["workflowId"], userInput=task.get("userInput", ""), status=task.get("status", TaskStatus.PENDING), error=task.get("error"), startedAt=task.get("startedAt"), finishedAt=task.get("finishedAt"), actionList=[TaskAction(**action) for action in task.get("actionList", [])], documentsOutput=task.get("documentsOutput", []), retryCount=task.get("retryCount", 0), retryMax=task.get("retryMax", 3), rollbackOnFailure=task.get("rollbackOnFailure", True), dependencies=task.get("dependencies", []), feedback=task.get("feedback") ) except Exception as e: logger.error(f"Error validating task data: {str(e)}") return None def getWorkflowTasks(self, workflowId: str) -> List[TaskItem]: """Returns tasks for a workflow if user has access to the workflow.""" # Check workflow access first workflow = self.getWorkflow(workflowId) if not workflow: return [] # Get tasks for this workflow tasks = self.db.getRecordset("tasks", recordFilter={"workflowId": workflowId}) return [TaskItem(**task) for task in self._uam("tasks", tasks)] def createTask(self, taskData: Dict[str, Any]) -> TaskItem: """Creates a new task if user has access to the workflow.""" try: # Ensure ID is present if "id" not in taskData or not taskData["id"]: taskData["id"] = f"task_{uuid.uuid4()}" # Check workflow access workflowId = taskData.get("workflowId") if not workflowId: logger.error("No workflowId provided for createTask") return None workflow = self.getWorkflow(workflowId) if not workflow: logger.warning(f"No access to workflow {workflowId}") return None if not self._canModify("workflows", workflowId): logger.warning(f"No permission to modify workflow {workflowId}") return None # Ensure required fields if "status" not in taskData: taskData["status"] = TaskStatus.PENDING if "startedAt" not in taskData: taskData["startedAt"] = self._getCurrentTimestamp() # Create task in database createdTask = self.db.recordCreate("tasks", taskData) # Convert to TaskItem model task = TaskItem( id=createdTask["id"], workflowId=createdTask["workflowId"], userInput=createdTask.get("userInput", ""), status=createdTask.get("status", TaskStatus.PENDING), error=createdTask.get("error"), startedAt=createdTask.get("startedAt"), finishedAt=createdTask.get("finishedAt"), actionList=[TaskAction(**action) for action in createdTask.get("actionList", [])], documentsOutput=createdTask.get("documentsOutput", []), retryCount=createdTask.get("retryCount", 0), retryMax=createdTask.get("retryMax", 3), rollbackOnFailure=createdTask.get("rollbackOnFailure", True), dependencies=createdTask.get("dependencies", []), feedback=createdTask.get("feedback") ) # Update workflow's task list workflowTasks = workflow.tasks if hasattr(workflow, 'tasks') else [] if task.id not in workflowTasks: workflowTasks.append(task.id) self.updateWorkflow(workflowId, {"tasks": workflowTasks}) return task except Exception as e: logger.error(f"Error creating task: {str(e)}") return None def updateTask(self, taskId: str, taskData: Dict[str, Any]) -> TaskItem: """Updates a task if user has access to the workflow.""" try: # Get existing task task = self.getTask(taskId) if not task: logger.warning(f"Task {taskId} not found") return None # Check workflow access workflow = self.getWorkflow(task.workflowId) if not workflow: logger.warning(f"No access to workflow {task.workflowId}") return None if not self._canModify("workflows", task.workflowId): logger.warning(f"No permission to modify workflow {task.workflowId}") return None # Update task in database updatedTask = self.db.recordModify("tasks", taskId, taskData) # Convert to TaskItem model return TaskItem( id=updatedTask["id"], workflowId=updatedTask["workflowId"], userInput=updatedTask.get("userInput", task.userInput), status=updatedTask.get("status", task.status), error=updatedTask.get("error", task.error), startedAt=updatedTask.get("startedAt", task.startedAt), finishedAt=updatedTask.get("finishedAt", task.finishedAt), actionList=[TaskAction(**action) for action in updatedTask.get("actionList", task.actionList)], documentsOutput=updatedTask.get("documentsOutput", task.documentsOutput), retryCount=updatedTask.get("retryCount", task.retryCount), retryMax=updatedTask.get("retryMax", task.retryMax), rollbackOnFailure=updatedTask.get("rollbackOnFailure", task.rollbackOnFailure), dependencies=updatedTask.get("dependencies", task.dependencies), feedback=updatedTask.get("feedback", task.feedback) ) except Exception as e: logger.error(f"Error updating task: {str(e)}") return None def deleteTask(self, taskId: str) -> bool: """Deletes a task if user has access to the workflow.""" try: # Get existing task task = self.getTask(taskId) if not task: logger.warning(f"Task {taskId} not found") return False # Check workflow access workflow = self.getWorkflow(task.workflowId) if not workflow: logger.warning(f"No access to workflow {task.workflowId}") return False if not self._canModify("workflows", task.workflowId): logger.warning(f"No permission to modify workflow {task.workflowId}") return False # Delete task if self.db.recordDelete("tasks", taskId): # Update workflow's task list workflowTasks = workflow.tasks if hasattr(workflow, 'tasks') else [] if taskId in workflowTasks: workflowTasks.remove(taskId) self.updateWorkflow(task.workflowId, {"tasks": workflowTasks}) return True return False except Exception as e: logger.error(f"Error deleting task: {str(e)}") return False # Task Result Management def createTaskResult(self, resultData: Dict[str, Any]) -> 'TaskResult': """Creates a new task result if user has access to the workflow.""" try: # Ensure ID is present if "id" not in resultData or not resultData["id"]: resultData["id"] = f"result_{uuid.uuid4()}" # Check workflow access if taskId is provided taskId = resultData.get("taskId") if taskId: task = self.getTask(taskId) if task: workflow = self.getWorkflow(task.workflowId) if not workflow: logger.warning(f"No access to workflow {task.workflowId}") return None if not self._canModify("workflows", task.workflowId): logger.warning(f"No permission to modify workflow {task.workflowId}") return None # Ensure required fields if "status" not in resultData: resultData["status"] = TaskStatus.PENDING if "success" not in resultData: resultData["success"] = False # Create result in database createdResult = self.db.recordCreate("taskResults", resultData) # Convert to TaskResult model return TaskResult( taskId=createdResult.get("taskId", ""), status=createdResult.get("status", TaskStatus.PENDING), success=createdResult.get("success", False), feedback=createdResult.get("feedback"), error=createdResult.get("error") ) except Exception as e: logger.error(f"Error creating task result: {str(e)}") return None def createActionResult(self, resultData: Dict[str, Any]) -> 'ActionResult': """Creates a new action result.""" try: # Ensure ID is present if "id" not in resultData or not resultData["id"]: resultData["id"] = f"action_result_{uuid.uuid4()}" # Ensure required fields if "success" not in resultData: resultData["success"] = False if "data" not in resultData: resultData["data"] = {} # Create result in database createdResult = self.db.recordCreate("actionResults", resultData) # Convert to ActionResult model return ActionResult( success=createdResult.get("success", False), data=createdResult.get("data", {}), metadata=createdResult.get("metadata", {}), validation=createdResult.get("validation", {}), error=createdResult.get("error") ) except Exception as e: logger.error(f"Error creating action result: {str(e)}") return None def createTaskAction(self, actionData: Dict[str, Any]) -> TaskAction: """Creates a new task action.""" try: # Ensure ID is present if "id" not in actionData or not actionData["id"]: actionData["id"] = f"action_{uuid.uuid4()}" # Ensure required fields if "status" not in actionData: actionData["status"] = TaskStatus.PENDING if "execMethod" not in actionData: logger.error("execMethod is required for task action") return None if "execAction" not in actionData: logger.error("execAction is required for task action") return None if "execParameters" not in actionData: actionData["execParameters"] = {} # Create action in database createdAction = self.db.recordCreate("taskActions", actionData) # Convert to TaskAction model return TaskAction( id=createdAction["id"], execMethod=createdAction["execMethod"], execAction=createdAction["execAction"], execParameters=createdAction.get("execParameters", {}), execResultLabel=createdAction.get("execResultLabel"), expectedDocumentFormats=createdAction.get("expectedDocumentFormats"), status=createdAction.get("status", TaskStatus.PENDING), error=createdAction.get("error"), retryCount=createdAction.get("retryCount", 0), retryMax=createdAction.get("retryMax", 3), processingTime=createdAction.get("processingTime"), timestamp=datetime.fromtimestamp(float(createdAction.get("timestamp", time.time()))), result=createdAction.get("result"), resultDocuments=createdAction.get("resultDocuments", []) ) except Exception as e: logger.error(f"Error creating task action: {str(e)}") return None def createChatDocument(self, documentData: Dict[str, Any]) -> ChatDocument: """Creates a new ChatDocument with automatic ID generation.""" try: # Ensure ID is present if "id" not in documentData or not documentData["id"]: documentData["id"] = f"doc_{uuid.uuid4()}" # Ensure required fields if "fileId" not in documentData: logger.error("fileId is required for ChatDocument") return None if "filename" not in documentData: documentData["filename"] = "unknown" if "fileSize" not in documentData: documentData["fileSize"] = 0 if "mimeType" not in documentData: documentData["mimeType"] = "application/octet-stream" # Create ChatDocument using the model return ChatDocument( id=documentData["id"], fileId=documentData["fileId"], filename=documentData["filename"], fileSize=documentData["fileSize"], mimeType=documentData["mimeType"] ) except Exception as e: logger.error(f"Error creating ChatDocument: {str(e)}") return None def getInterface(currentUser: Optional[User] = None) -> 'ChatObjects': """ Returns a ChatObjects instance for the current user. Handles initialization of database and records. """ if not currentUser: raise ValueError("Invalid user context: user is required") # Create context key contextKey = f"{currentUser.mandateId}_{currentUser.id}" # Create new instance if not exists if contextKey not in _chatInterfaces: _chatInterfaces[contextKey] = ChatObjects(currentUser) return _chatInterfaces[contextKey]