""" Interface to LucyDOM database and AI Connectors. Uses the JSON connector for data access with added language support. """ import os import logging import uuid from datetime import datetime, UTC, timezone from typing import Dict, Any, List, Optional, Union, get_origin, get_args import asyncio from modules.interfaces.interfaceDbChatAccess import ChatAccess from modules.datamodels.datamodelChat import ( ActionItem, TaskResult, TaskItem, TaskStatus, ActionResult ) from modules.datamodels.datamodelChat import ( UserInputRequest, ChatDocument, ChatStat, ChatLog, ChatMessage, ChatWorkflow ) from modules.datamodels.datamodelUam import User # DYNAMIC PART: Connectors to the Interface from modules.connectors.connectorDbPostgre import DatabaseConnector from modules.shared.timezoneUtils import get_utc_timestamp # Basic Configurations from modules.shared.configuration import APP_CONFIG logger = logging.getLogger(__name__) # Singleton factory for Chat instances _chatInterfaces = {} class ChatObjects: """ Interface to Chat database and AI Connectors. Uses the JSON connector for data access with added language support. """ def __init__(self, currentUser: Optional[User] = None): """Initializes the Chat Interface.""" # Initialize variables self.currentUser = currentUser # Store User object directly self.userId = currentUser.id if currentUser else None self.mandateId = currentUser.mandateId if currentUser else None self.access = None # Will be set when user context is provided # Initialize services self._initializeServices() # Initialize database self._initializeDatabase() # Set user context if provided if currentUser: self.setUserContext(currentUser) # ===== Generic Utility Methods ===== def _is_object_field(self, field_type) -> bool: """Check if a field type represents a complex object (not a simple type).""" # Simple scalar types if field_type in (str, int, float, bool, type(None)): return False # Everything else is an object return True def _separate_object_fields(self, model_class, data: Dict[str, Any]) -> tuple[Dict[str, Any], Dict[str, Any]]: """Separate simple fields from object fields based on Pydantic model structure.""" simple_fields = {} object_fields = {} # Get field information from the Pydantic model model_fields = {} if hasattr(model_class, '__fields__'): model_fields = model_class.__fields__ elif hasattr(model_class, 'model_fields'): model_fields = model_class.model_fields for field_name, value in data.items(): # Check if this field should be stored as JSONB in the database if field_name in model_fields: field_info = model_fields[field_name] # Handle both Pydantic v1 and v2 if hasattr(field_info, 'type_'): field_type = field_info.type_ # Pydantic v1 elif hasattr(field_info, 'annotation'): field_type = field_info.annotation # Pydantic v2 else: field_type = type(value) # Fallback # Always route relational/object fields to object_fields for separate handling if field_name in ['documents', 'stats']: object_fields[field_name] = value continue # Check if this is a JSONB field (Dict, List, or complex types) if (field_type == dict or field_type == list or (hasattr(field_type, '__origin__') and field_type.__origin__ in (dict, list)) or field_name in ['execParameters', 'expectedDocumentFormats', 'resultDocuments']): # Store as JSONB - include in simple_fields for database storage simple_fields[field_name] = value elif isinstance(value, (str, int, float, bool, type(None))): # Simple scalar types simple_fields[field_name] = value else: # Complex objects that should be filtered out object_fields[field_name] = value else: # Field not in model - treat as scalar if simple, otherwise filter out if isinstance(value, (str, int, float, bool, type(None))): simple_fields[field_name] = value else: object_fields[field_name] = value return simple_fields, object_fields def _initializeServices(self): pass def setUserContext(self, currentUser: User): """Sets the user context for the interface.""" self.currentUser = currentUser # Store User object directly self.userId = currentUser.id self.mandateId = currentUser.mandateId if not self.userId or not self.mandateId: raise ValueError("Invalid user context: id and mandateId are required") # Add language settings self.userLanguage = currentUser.language # Default user language # Initialize access control with user context self.access = ChatAccess(self.currentUser, self.db) # Convert to dict only when needed # Update database context self.db.updateContext(self.userId) def __del__(self): """Cleanup method to close database connection.""" if hasattr(self, 'db') and self.db is not None: try: self.db.close() except Exception as e: logger.error(f"Error closing database connection: {e}") def _initializeDatabase(self): """Initializes the database connection directly.""" try: # Get configuration values with defaults dbHost = APP_CONFIG.get("DB_CHAT_HOST", "_no_config_default_data") dbDatabase = APP_CONFIG.get("DB_CHAT_DATABASE", "chat") dbUser = APP_CONFIG.get("DB_CHAT_USER") dbPassword = APP_CONFIG.get("DB_CHAT_PASSWORD_SECRET") dbPort = int(APP_CONFIG.get("DB_CHAT_PORT", 5432)) # Create database connector directly self.db = DatabaseConnector( dbHost=dbHost, dbDatabase=dbDatabase, dbUser=dbUser, dbPassword=dbPassword, dbPort=dbPort, userId=self.userId ) # Initialize database system self.db.initDbSystem() logger.info("Database initialized successfully") except Exception as e: logger.error(f"Failed to initialize database: {str(e)}") raise def _initRecords(self): """Initializes standard records in the database if they don't exist.""" pass def _uam(self, model_class: type, recordset: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Delegate to access control module.""" # First apply access control filteredRecords = self.access.uam(model_class, recordset) # Then filter out database-specific fields cleanedRecords = [] for record in filteredRecords: # Create a new dict with only non-database fields cleanedRecord = {k: v for k, v in record.items() if not k.startswith('_')} cleanedRecords.append(cleanedRecord) return cleanedRecords def _canModify(self, model_class: type, recordId: Optional[str] = None) -> bool: """Delegate to access control module.""" return self.access.canModify(model_class, recordId) # Utilities def getInitialId(self, model_class: type) -> Optional[str]: """Returns the initial ID for a table.""" return self.db.getInitialId(model_class) # Workflow methods def getWorkflows(self) -> List[Dict[str, Any]]: """Returns workflows based on user access level.""" allWorkflows = self.db.getRecordset(ChatWorkflow) return self._uam(ChatWorkflow, allWorkflows) def getWorkflow(self, workflowId: str) -> Optional[ChatWorkflow]: """Returns a workflow by ID if user has access.""" workflows = self.db.getRecordset(ChatWorkflow, recordFilter={"id": workflowId}) if not workflows: return None filteredWorkflows = self._uam(ChatWorkflow, workflows) if not filteredWorkflows: return None workflow = filteredWorkflows[0] try: # Load related data from normalized tables logs = self.getLogs(workflowId) messages = self.getMessages(workflowId) stats = self.getWorkflowStats(workflowId) # Validate workflow data against ChatWorkflow model return ChatWorkflow( id=workflow["id"], status=workflow.get("status", "running"), name=workflow.get("name"), currentRound=workflow.get("currentRound", 0), currentTask=workflow.get("currentTask", 0), currentAction=workflow.get("currentAction", 0), totalTasks=workflow.get("totalTasks", 0), totalActions=workflow.get("totalActions", 0), lastActivity=workflow.get("lastActivity", get_utc_timestamp()), startedAt=workflow.get("startedAt", get_utc_timestamp()), logs=logs, messages=messages, stats=stats, mandateId=workflow.get("mandateId", self.currentUser.mandateId) ) except Exception as e: logger.error(f"Error validating workflow data: {str(e)}") return None def createWorkflow(self, workflowData: Dict[str, Any]) -> ChatWorkflow: """Creates a new workflow if user has permission.""" if not self._canModify(ChatWorkflow): raise PermissionError("No permission to create workflows") # Set timestamp if not present currentTime = get_utc_timestamp() if "startedAt" not in workflowData: workflowData["startedAt"] = currentTime if "lastActivity" not in workflowData: workflowData["lastActivity"] = currentTime # Use generic field separation based on ChatWorkflow model simple_fields, object_fields = self._separate_object_fields(ChatWorkflow, workflowData) # Create workflow in database created = self.db.recordCreate(ChatWorkflow, simple_fields) # Convert to ChatWorkflow model (empty related data for new workflow) return ChatWorkflow( id=created["id"], status=created.get("status", "running"), name=created.get("name"), currentRound=created.get("currentRound", 0), currentTask=created.get("currentTask", 0), currentAction=created.get("currentAction", 0), totalTasks=created.get("totalTasks", 0), totalActions=created.get("totalActions", 0), lastActivity=created.get("lastActivity", currentTime), startedAt=created.get("startedAt", currentTime), logs=[], messages=[], stats=None, mandateId=created.get("mandateId", self.currentUser.mandateId), workflowMode=created.get("workflowMode", "Actionplan"), maxSteps=created.get("maxSteps", 1) ) def updateWorkflow(self, workflowId: str, workflowData: Dict[str, Any]) -> ChatWorkflow: """Updates a workflow if user has access.""" # Check if the workflow exists and user has access workflow = self.getWorkflow(workflowId) if not workflow: return None if not self._canModify(ChatWorkflow, workflowId): raise PermissionError(f"No permission to update workflow {workflowId}") # Use generic field separation based on ChatWorkflow model simple_fields, object_fields = self._separate_object_fields(ChatWorkflow, workflowData) # Set update time for main workflow simple_fields["lastActivity"] = get_utc_timestamp() # Update main workflow in database updated = self.db.recordModify(ChatWorkflow, workflowId, simple_fields) # Handle object field updates (inline to avoid helper dependency) if 'logs' in object_fields: logs_data = object_fields['logs'] try: for log_data in logs_data: if hasattr(log_data, 'model_dump'): log_dict = log_data.model_dump() # Pydantic v2 elif hasattr(log_data, 'dict'): log_dict = log_data.dict() # Pydantic v1 elif hasattr(log_data, 'to_dict'): log_dict = log_data.to_dict() else: log_dict = log_data log_dict["workflowId"] = workflowId self.createLog(log_dict) except Exception as e: logger.error(f"Error updating workflow logs: {str(e)}") if 'messages' in object_fields: messages_data = object_fields['messages'] try: for message_data in messages_data: if hasattr(message_data, 'model_dump'): msg_dict = message_data.model_dump() # Pydantic v2 elif hasattr(message_data, 'dict'): msg_dict = message_data.dict() # Pydantic v1 elif hasattr(message_data, 'to_dict'): msg_dict = message_data.to_dict() else: msg_dict = message_data msg_dict["workflowId"] = workflowId self.updateMessage(msg_dict.get("id"), msg_dict) except Exception as e: logger.error(f"Error updating workflow messages: {str(e)}") if 'stats' in object_fields: stats_data = object_fields['stats'] try: if stats_data: stats_data["workflowId"] = workflowId self.db.recordCreate(ChatStat, stats_data) except Exception as e: logger.error(f"Error updating workflow stats: {str(e)}") # Load fresh data from normalized tables logs = self.getLogs(workflowId) messages = self.getMessages(workflowId) stats = self.getWorkflowStats(workflowId) # Convert to ChatWorkflow model return ChatWorkflow( id=updated["id"], status=updated.get("status", workflow.status), name=updated.get("name", workflow.name), currentRound=updated.get("currentRound", workflow.currentRound), currentTask=updated.get("currentTask", workflow.currentTask), currentAction=updated.get("currentAction", workflow.currentAction), totalTasks=updated.get("totalTasks", workflow.totalTasks), totalActions=updated.get("totalActions", workflow.totalActions), lastActivity=updated.get("lastActivity", workflow.lastActivity), startedAt=updated.get("startedAt", workflow.startedAt), logs=logs, messages=messages, stats=stats, mandateId=updated.get("mandateId", workflow.mandateId) ) def deleteWorkflow(self, workflowId: str) -> bool: """Deletes a workflow and all related data if user has access.""" try: # Check if the workflow exists and user has access workflow = self.getWorkflow(workflowId) if not workflow: return False if not self._canModify(ChatWorkflow, workflowId): raise PermissionError(f"No permission to delete workflow {workflowId}") # CASCADE DELETE: Delete all related data first # 1. Delete all workflow messages and their related data messages = self.getMessages(workflowId) for message in messages: messageId = message.id if messageId: # Delete message stats existing_stats = self.db.getRecordset(ChatStat, recordFilter={"messageId": messageId}) for stat in existing_stats: self.db.recordDelete(ChatStat, stat["id"]) # Delete message documents (but NOT the files!) existing_docs = self.db.getRecordset(ChatDocument, recordFilter={"messageId": messageId}) for doc in existing_docs: self.db.recordDelete(ChatDocument, doc["id"]) # Delete the message itself self.db.recordDelete(ChatMessage, messageId) # 2. Delete workflow stats existing_stats = self.db.getRecordset(ChatStat, recordFilter={"workflowId": workflowId}) for stat in existing_stats: self.db.recordDelete(ChatStat, stat["id"]) # 3. Delete workflow logs existing_logs = self.db.getRecordset(ChatLog, recordFilter={"workflowId": workflowId}) for log in existing_logs: self.db.recordDelete(ChatLog, log["id"]) # 4. Finally delete the workflow itself success = self.db.recordDelete(ChatWorkflow, workflowId) return success except Exception as e: logger.error(f"Error deleting workflow {workflowId}: {str(e)}") return False # Message methods def getMessages(self, workflowId: str) -> List[ChatMessage]: """Returns messages for a workflow if user has access to the workflow.""" # Check workflow access first (without calling getWorkflow to avoid circular reference) workflows = self.db.getRecordset(ChatWorkflow, recordFilter={"id": workflowId}) if not workflows: return [] filteredWorkflows = self._uam(ChatWorkflow, workflows) if not filteredWorkflows: return [] # Get messages for this workflow from normalized table messages = self.db.getRecordset(ChatMessage, recordFilter={"workflowId": workflowId}) # Sort messages by publishedAt timestamp to ensure chronological order messages.sort(key=lambda x: x.get("publishedAt", x.get("timestamp", "0"))) # Convert messages to ChatMessage objects and load documents chat_messages = [] for msg in messages: # Load documents from normalized documents table documents = self.getDocuments(msg["id"]) # Create ChatMessage object with loaded documents chat_message = ChatMessage( id=msg["id"], workflowId=msg["workflowId"], parentMessageId=msg.get("parentMessageId"), documents=documents, documentsLabel=msg.get("documentsLabel"), message=msg.get("message"), role=msg.get("role", "assistant"), status=msg.get("status", "step"), sequenceNr=msg.get("sequenceNr", 0), publishedAt=msg.get("publishedAt", get_utc_timestamp()), stats=self.getMessageStats(msg["id"]), success=msg.get("success"), actionId=msg.get("actionId"), actionMethod=msg.get("actionMethod"), actionName=msg.get("actionName"), roundNumber=msg.get("roundNumber"), taskNumber=msg.get("taskNumber"), actionNumber=msg.get("actionNumber"), taskProgress=msg.get("taskProgress"), actionProgress=msg.get("actionProgress") ) chat_messages.append(chat_message) return chat_messages def createMessage(self, messageData: Dict[str, Any]) -> ChatMessage: """Creates a message for a workflow if user has access.""" try: # Ensure ID is present if "id" not in messageData or not messageData["id"]: messageData["id"] = f"msg_{uuid.uuid4()}" # Check required fields requiredFields = ["id", "workflowId"] for field in requiredFields: if field not in messageData: logger.error(f"Required field '{field}' missing in messageData") raise ValueError(f"Required field '{field}' missing in message data") # Check workflow access workflowId = messageData["workflowId"] workflow = self.getWorkflow(workflowId) if not workflow: raise PermissionError(f"No access to workflow {workflowId}") if not self._canModify(ChatWorkflow, workflowId): raise PermissionError(f"No permission to modify workflow {workflowId}") # Validate that ID is not None if messageData["id"] is None: messageData["id"] = f"msg_{uuid.uuid4()}" logger.warning(f"Automatically generated ID for workflow message: {messageData['id']}") # Set status if not present if "status" not in messageData: messageData["status"] = "step" # Default status for intermediate messages # Ensure role and agentName are present if "role" not in messageData: messageData["role"] = "assistant" if messageData.get("agentName") else "user" if "agentName" not in messageData: messageData["agentName"] = "" # CRITICAL FIX: Automatically set roundNumber, taskNumber, and actionNumber if not provided # This ensures messages have the correct progress context when workflows are continued if "roundNumber" not in messageData: messageData["roundNumber"] = workflow.currentRound if "taskNumber" not in messageData: messageData["taskNumber"] = workflow.currentTask if "actionNumber" not in messageData: messageData["actionNumber"] = workflow.currentAction # Use generic field separation based on ChatMessage model simple_fields, object_fields = self._separate_object_fields(ChatMessage, messageData) # Handle documents separately - they will be stored in normalized documents table documents_to_create = object_fields.get("documents", []) # Create message in normalized table using only simple fields createdMessage = self.db.recordCreate(ChatMessage, simple_fields) # Create documents in normalized documents table created_documents = [] for doc_data in documents_to_create: # Convert to dict if it's a Pydantic object if hasattr(doc_data, 'model_dump'): doc_dict = doc_data.model_dump() # Pydantic v2 elif hasattr(doc_data, 'dict'): doc_dict = doc_data.dict() # Pydantic v1 elif hasattr(doc_data, 'to_dict'): doc_dict = doc_data.to_dict() else: doc_dict = doc_data doc_dict["messageId"] = createdMessage["id"] created_doc = self.createDocument(doc_dict) if created_doc: created_documents.append(created_doc) # Convert to ChatMessage model chat_message = ChatMessage( id=createdMessage["id"], workflowId=createdMessage["workflowId"], parentMessageId=createdMessage.get("parentMessageId"), agentName=createdMessage.get("agentName"), documents=created_documents, documentsLabel=createdMessage.get("documentsLabel"), message=createdMessage.get("message"), role=createdMessage.get("role", "assistant"), status=createdMessage.get("status", "step"), sequenceNr=len(workflow.messages) + 1, # Use messages list length for sequence number publishedAt=createdMessage.get("publishedAt", get_utc_timestamp()), stats=object_fields.get("stats"), # Use stats from object_fields roundNumber=createdMessage.get("roundNumber"), taskNumber=createdMessage.get("taskNumber"), actionNumber=createdMessage.get("actionNumber"), success=createdMessage.get("success"), actionId=createdMessage.get("actionId"), actionMethod=createdMessage.get("actionMethod"), actionName=createdMessage.get("actionName") ) # Debug: Store message and documents for debugging - only if debug enabled debug_enabled = APP_CONFIG.get("APP_DEBUG_CHAT_WORKFLOW_ENABLED", False) if debug_enabled: self._storeDebugMessageAndDocuments(chat_message) return chat_message except Exception as e: logger.error(f"Error creating workflow message: {str(e)}") return None def updateMessage(self, messageId: str, messageData: Dict[str, Any]) -> Dict[str, Any]: """Updates a workflow message if user has access to the workflow.""" try: # Ensure messageId is provided if not messageId: logger.error("No messageId provided for updateMessage") raise ValueError("messageId cannot be empty") # Check if message exists in database messages = self.db.getRecordset(ChatMessage, recordFilter={"id": messageId}) if not messages: logger.warning(f"Message with ID {messageId} does not exist in database") # If message doesn't exist but we have workflowId, create it if "workflowId" in messageData: workflowId = messageData.get("workflowId") # Check workflow access workflow = self.getWorkflow(workflowId) if not workflow: raise PermissionError(f"No access to workflow {workflowId}") if not self._canModify(ChatWorkflow, workflowId): raise PermissionError(f"No permission to modify workflow {workflowId}") logger.info(f"Creating new message with ID {messageId} for workflow {workflowId}") return self.db.recordCreate(ChatMessage, messageData) else: logger.error(f"Workflow ID missing for new message {messageId}") return None # Update existing message existingMessage = messages[0] # Check workflow access workflowId = existingMessage.get("workflowId") workflow = self.getWorkflow(workflowId) if not workflow: raise PermissionError(f"No access to workflow {workflowId}") if not self._canModify(ChatWorkflow, workflowId): raise PermissionError(f"No permission to modify workflow {workflowId}") # Use generic field separation based on ChatMessage model simple_fields, object_fields = self._separate_object_fields(ChatMessage, messageData) # Ensure required fields present for key in ["role", "agentName"]: if key not in simple_fields and key not in existingMessage: simple_fields[key] = "assistant" if key == "role" else "" # Ensure ID is in the dataset if 'id' not in simple_fields: simple_fields['id'] = messageId # Convert createdAt to startedAt if needed if "createdAt" in simple_fields and "startedAt" not in simple_fields: simple_fields["startedAt"] = simple_fields["createdAt"] del simple_fields["createdAt"] # Update the message with simple fields only updatedMessage = self.db.recordModify(ChatMessage, messageId, simple_fields) # Handle object field updates (documents, stats) inline if 'documents' in object_fields: documents_data = object_fields['documents'] try: for doc_data in documents_data: if hasattr(doc_data, 'model_dump'): doc_dict = doc_data.model_dump() # Pydantic v2 elif hasattr(doc_data, 'dict'): doc_dict = doc_data.dict() # Pydantic v1 elif hasattr(doc_data, 'to_dict'): doc_dict = doc_data.to_dict() else: doc_dict = doc_data doc_dict["messageId"] = messageId self.createDocument(doc_dict) except Exception as e: logger.error(f"Error updating message documents: {str(e)}") if 'stats' in object_fields: stats_data = object_fields['stats'] try: if stats_data: stats_data["messageId"] = messageId self.db.recordCreate(ChatStat, stats_data) except Exception as e: logger.error(f"Error updating message stats: {str(e)}") if not updatedMessage: logger.warning(f"Failed to update message {messageId}") return updatedMessage except Exception as e: logger.error(f"Error updating message {messageId}: {str(e)}", exc_info=True) raise ValueError(f"Error updating message {messageId}: {str(e)}") def deleteMessage(self, workflowId: str, messageId: str) -> bool: """Deletes a workflow message and all related data if user has access to the workflow.""" try: # Check workflow access workflow = self.getWorkflow(workflowId) if not workflow: logger.warning(f"No access to workflow {workflowId}") return False if not self._canModify(ChatWorkflow, workflowId): raise PermissionError(f"No permission to modify workflow {workflowId}") # Check if the message exists messages = self.getMessages(workflowId) message = next((m for m in messages if m.get("id") == messageId), None) if not message: logger.warning(f"Message {messageId} for workflow {workflowId} not found") return False # CASCADE DELETE: Delete all related data first # 1. Delete message stats existing_stats = self.db.getRecordset(ChatStat, recordFilter={"messageId": messageId}) for stat in existing_stats: self.db.recordDelete(ChatStat, stat["id"]) # 2. Delete message documents (but NOT the files!) existing_docs = self.db.getRecordset(ChatDocument, recordFilter={"messageId": messageId}) for doc in existing_docs: self.db.recordDelete(ChatDocument, doc["id"]) # 3. Finally delete the message itself success = self.db.recordDelete(ChatMessage, messageId) return success except Exception as e: logger.error(f"Error deleting message {messageId}: {str(e)}") return False def deleteFileFromMessage(self, workflowId: str, messageId: str, fileId: str) -> bool: """Removes a file reference from a message if user has access.""" try: # Check workflow access workflow = self.getWorkflow(workflowId) if not workflow: logger.warning(f"No access to workflow {workflowId}") return False if not self._canModify(ChatWorkflow, workflowId): raise PermissionError(f"No permission to modify workflow {workflowId}") # Get documents for this message from normalized table documents = self.db.getRecordset(ChatDocument, recordFilter={"messageId": messageId}) if not documents: logger.warning(f"No documents found for message {messageId}") return False # Find and delete the specific document removed = False for doc in documents: docId = doc.get("id") fileIdValue = doc.get("fileId") # Flexible matching approach shouldRemove = ( (docId == fileId) or (fileIdValue == fileId) or (isinstance(docId, str) and str(fileId) in docId) or (isinstance(fileIdValue, str) and str(fileId) in fileIdValue) ) if shouldRemove: # Delete the document from normalized table success = self.db.recordDelete(ChatDocument, docId) if success: removed = True else: logger.warning(f"Failed to delete document {docId}") if not removed: logger.warning(f"No matching file {fileId} found in message {messageId}") return False return True except Exception as e: logger.error(f"Error removing file {fileId} from message {messageId}: {str(e)}") return False # Document methods def getDocuments(self, messageId: str) -> List[ChatDocument]: """Returns documents for a message from normalized table.""" try: documents = self.db.getRecordset(ChatDocument, recordFilter={"messageId": messageId}) return [ChatDocument(**doc) for doc in documents] except Exception as e: logger.error(f"Error getting message documents: {str(e)}") return [] def createDocument(self, documentData: Dict[str, Any]) -> ChatDocument: """Creates a document for a message in normalized table.""" try: # Validate document data document = ChatDocument(**documentData) # Create document in normalized table created = self.db.recordCreate(ChatDocument, document) return ChatDocument(**created) except Exception as e: logger.error(f"Error creating message document: {str(e)}") return None # Log methods def getLogs(self, workflowId: str) -> List[ChatLog]: """Returns logs for a workflow if user has access to the workflow.""" # Check workflow access first (without calling getWorkflow to avoid circular reference) workflows = self.db.getRecordset(ChatWorkflow, recordFilter={"id": workflowId}) if not workflows: return [] filteredWorkflows = self._uam(ChatWorkflow, workflows) if not filteredWorkflows: return [] # Get logs for this workflow from normalized table logs = self.db.getRecordset(ChatLog, recordFilter={"workflowId": workflowId}) # Sort logs by timestamp (Unix timestamps) logs.sort(key=lambda x: float(x.get("timestamp", 0))) return [ChatLog(**log) for log in logs] def createLog(self, logData: Dict[str, Any]) -> ChatLog: """Creates a log entry for a workflow if user has access.""" # Check workflow access workflowId = logData.get("workflowId") if not workflowId: logger.error("No workflowId provided for createLog") return None workflow = self.getWorkflow(workflowId) if not workflow: logger.warning(f"No access to workflow {workflowId}") return None if not self._canModify(ChatWorkflow, workflowId): logger.warning(f"No permission to modify workflow {workflowId}") return None # Make sure required fields are present if "timestamp" not in logData: logData["timestamp"] = get_utc_timestamp() # Add status information if not present if "status" not in logData and "type" in logData: if logData["type"] == "error": logData["status"] = "error" else: logData["status"] = "running" # Add progress information if not present if "progress" not in logData: # Default progress values based on log type if logData.get("type") == "info": logData["progress"] = 50 # Default middle progress elif logData.get("type") == "error": logData["progress"] = -1 # Error state elif logData.get("type") == "warning": logData["progress"] = 50 # Default middle progress # Validate log data against ChatLog model try: log_model = ChatLog(**logData) except Exception as e: logger.error(f"Invalid log data: {str(e)}") return None # Create log in normalized table createdLog = self.db.recordCreate(ChatLog, log_model) # Return validated ChatLog instance return ChatLog(**createdLog) # Stats methods def getMessageStats(self, messageId: str) -> Optional[ChatStat]: """Returns statistics for a message from normalized table.""" try: stats = self.db.getRecordset(ChatStat, recordFilter={"messageId": messageId}) if not stats: return None # Return the most recent stats record stats.sort(key=lambda x: x.get("created_at", ""), reverse=True) return ChatStat(**stats[0]) except Exception as e: logger.error(f"Error getting message stats: {str(e)}") return None def getWorkflowStats(self, workflowId: str) -> Optional[ChatStat]: """Returns statistics for a workflow if user has access.""" # Check workflow access first (without calling getWorkflow to avoid circular reference) workflows = self.db.getRecordset(ChatWorkflow, recordFilter={"id": workflowId}) if not workflows: return None filteredWorkflows = self._uam(ChatWorkflow, workflows) if not filteredWorkflows: return None # Get stats for this workflow from normalized table stats = self.db.getRecordset(ChatStat, recordFilter={"workflowId": workflowId}) if not stats: return None # Return the most recent stats record stats.sort(key=lambda x: x.get("created_at", ""), reverse=True) return ChatStat(**stats[0]) def updateWorkflowStats(self, workflowId: str, bytesSent: int = 0, bytesReceived: int = 0, tokenCount: int = 0) -> None: """ Updates workflow statistics in the database. Args: workflowId: ID of the workflow to update bytesSent: Bytes sent (incremental) bytesReceived: Bytes received (incremental) tokenCount: Token count (incremental, default 0) """ try: # Check workflow access first workflow = self.getWorkflow(workflowId) if not workflow: logger.warning(f"No access to workflow {workflowId} for stats update") return if not self._canModify(ChatWorkflow, workflowId): logger.warning(f"No permission to modify workflow {workflowId} for stats update") return # Get existing stats or create new ones existing_stats = self.getWorkflowStats(workflowId) if existing_stats: # Update existing stats updated_stats = { "bytesSent": (existing_stats.bytesSent or 0) + bytesSent, "bytesReceived": (existing_stats.bytesReceived or 0) + bytesReceived, "tokenCount": (existing_stats.tokenCount or 0) + tokenCount, "lastUpdated": get_utc_timestamp() } # Update the stats record self.db.recordModify(ChatStat, existing_stats.id, updated_stats) else: # Create new stats record new_stats = { "workflowId": workflowId, "bytesSent": bytesSent, "bytesReceived": bytesReceived, "tokenCount": tokenCount, "lastUpdated": get_utc_timestamp() } self.db.recordCreate(ChatStat, new_stats) logger.debug(f"Updated workflow stats for {workflowId}: +{bytesSent} sent, +{bytesReceived} received, +{tokenCount} tokens") except Exception as e: logger.error(f"Error updating workflow stats for {workflowId}: {str(e)}") def getUnifiedChatData(self, workflowId: str, afterTimestamp: Optional[float] = None) -> Dict[str, Any]: """ Returns unified chat data (messages, logs, stats) for a workflow in chronological order. Uses timestamp-based selective data transfer for efficient polling. """ # Check workflow access first workflows = self.db.getRecordset(ChatWorkflow, recordFilter={"id": workflowId}) if not workflows: return {"items": []} filteredWorkflows = self._uam(ChatWorkflow, workflows) if not filteredWorkflows: return {"items": []} # Get all data types and filter in Python (PostgreSQL connector doesn't support $gt operators) items = [] # Get messages messages = self.db.getRecordset(ChatMessage, recordFilter={"workflowId": workflowId}) for msg in messages: # Apply timestamp filtering in Python msg_timestamp = msg.get("publishedAt", get_utc_timestamp()) if afterTimestamp is not None and msg_timestamp <= afterTimestamp: continue # Load documents for each message documents = self.getDocuments(msg["id"]) # Create ChatMessage object with loaded documents chat_message = ChatMessage( id=msg["id"], workflowId=msg["workflowId"], parentMessageId=msg.get("parentMessageId"), documents=documents, documentsLabel=msg.get("documentsLabel"), message=msg.get("message"), role=msg.get("role", "assistant"), status=msg.get("status", "step"), sequenceNr=msg.get("sequenceNr", 0), publishedAt=msg.get("publishedAt", get_utc_timestamp()), stats=self.getMessageStats(msg["id"]), success=msg.get("success"), actionId=msg.get("actionId"), actionMethod=msg.get("actionMethod"), actionName=msg.get("actionName"), roundNumber=msg.get("roundNumber"), taskNumber=msg.get("taskNumber"), actionNumber=msg.get("actionNumber"), taskProgress=msg.get("taskProgress"), actionProgress=msg.get("actionProgress") ) # Use publishedAt as the timestamp for chronological ordering items.append({ "type": "message", "createdAt": msg_timestamp, "item": chat_message.model_dump() if hasattr(chat_message, 'model_dump') else chat_message.dict() }) # Get logs logs = self.db.getRecordset(ChatLog, recordFilter={"workflowId": workflowId}) for log in logs: # Apply timestamp filtering in Python log_timestamp = log.get("timestamp", get_utc_timestamp()) if afterTimestamp is not None and log_timestamp <= afterTimestamp: continue chat_log = ChatLog(**log) items.append({ "type": "log", "createdAt": log_timestamp, "item": chat_log.model_dump() if hasattr(chat_log, 'model_dump') else chat_log.dict() }) # Get stats stats = self.db.getRecordset(ChatStat, recordFilter={"workflowId": workflowId}) for stat in stats: # Apply timestamp filtering in Python stat_timestamp = stat.get("_createdAt", get_utc_timestamp()) if afterTimestamp is not None and stat_timestamp <= afterTimestamp: continue chat_stat = ChatStat(**stat) items.append({ "type": "stat", "createdAt": stat_timestamp, "item": chat_stat.model_dump() if hasattr(chat_stat, 'model_dump') else chat_stat.dict() }) # Sort all items by createdAt timestamp for chronological order items.sort(key=lambda x: x["createdAt"]) return {"items": items} def _storeDebugMessageAndDocuments(self, message: ChatMessage) -> None: """ Store message and documents (metadata and file bytes) for debugging purposes. Structure: gateway/test-chat/messages/m_round_task_action_timestamp/documentlist_label/ - message.json, message_text.txt - document_###_metadata.json - document_###_ (actual file bytes) Args: message: ChatMessage object to store """ try: import os import json from datetime import datetime, UTC # Create base debug directory debug_root = "./test-chat/messages" os.makedirs(debug_root, exist_ok=True) # Generate timestamp timestamp = datetime.now(UTC).strftime('%Y%m%d-%H%M%S-%f')[:-3] # Create message folder name: m_round_task_action_timestamp # Use actual values from message, not defaults round_str = str(message.roundNumber) if message.roundNumber is not None else "0" task_str = str(message.taskNumber) if message.taskNumber is not None else "0" action_str = str(message.actionNumber) if message.actionNumber is not None else "0" message_folder = f"{timestamp}_m_{round_str}_{task_str}_{action_str}" message_path = os.path.join(debug_root, message_folder) os.makedirs(message_path, exist_ok=True) # Store message data - use dict() instead of model_dump() for compatibility message_file = os.path.join(message_path, "message.json") with open(message_file, "w", encoding="utf-8") as f: # Convert message to dict manually to avoid model_dump() issues message_dict = { "id": message.id, "workflowId": message.workflowId, "parentMessageId": message.parentMessageId, "message": message.message, "role": message.role, "status": message.status, "sequenceNr": message.sequenceNr, "publishedAt": message.publishedAt, "roundNumber": message.roundNumber, "taskNumber": message.taskNumber, "actionNumber": message.actionNumber, "documentsLabel": message.documentsLabel, "actionId": message.actionId, "actionMethod": message.actionMethod, "actionName": message.actionName, "success": message.success, "documents": [] } json.dump(message_dict, f, indent=2, ensure_ascii=False, default=str) # Store message content as text if message.message: message_text_file = os.path.join(message_path, "message_text.txt") with open(message_text_file, "w", encoding="utf-8") as f: f.write(str(message.message)) # Store documents if provided if message.documents and len(message.documents) > 0: logger.info(f"Debug: Processing {len(message.documents)} documents") # Group documents by documentsLabel documents_by_label = {} for doc in message.documents: label = message.documentsLabel or 'default' if label not in documents_by_label: documents_by_label[label] = [] documents_by_label[label].append(doc) # Create subfolder for each document label for label, docs in documents_by_label.items(): # Sanitize label for filesystem safe_label = "".join(c for c in str(label) if c.isalnum() or c in (' ', '-', '_')).rstrip() safe_label = safe_label.replace(' ', '_') if not safe_label: safe_label = "default" label_folder = os.path.join(message_path, safe_label) os.makedirs(label_folder, exist_ok=True) logger.info(f"Debug: Created document folder: {label_folder}") # Store each document for i, doc in enumerate(docs): # Create document metadata file doc_meta = { "id": doc.id, "messageId": doc.messageId, "fileId": doc.fileId, "fileName": doc.fileName, "fileSize": doc.fileSize, "mimeType": doc.mimeType, "roundNumber": doc.roundNumber, "taskNumber": doc.taskNumber, "actionNumber": doc.actionNumber, "actionId": doc.actionId } doc_meta_file = os.path.join(label_folder, f"document_{i+1:03d}_metadata.json") with open(doc_meta_file, "w", encoding="utf-8") as f: json.dump(doc_meta, f, indent=2, ensure_ascii=False, default=str) logger.info(f"Debug: Stored document metadata for {doc.fileName}") # Also store the actual file bytes next to metadata for debugging try: # Lazy import to avoid circular deps at module load from modules.interfaces import interfaceDbComponentObjects as comp componentInterface = comp.getInterface(self.currentUser) file_bytes = componentInterface.getFileData(doc.fileId) if file_bytes: # Build a safe filename preserving original name safe_name = doc.fileName or f"document_{i+1:03d}" # Avoid path traversal safe_name = os.path.basename(safe_name) doc_file_path = os.path.join(label_folder, f"document_{i+1:03d}_" + safe_name) with open(doc_file_path, "wb") as df: df.write(file_bytes) logger.info(f"Debug: Stored document file bytes: {doc_file_path} ({len(file_bytes)} bytes)") else: logger.warning(f"Debug: No file bytes returned for fileId {doc.fileId}") except Exception as e: logger.error(f"Debug: Failed to store document file for {doc.fileName} (fileId {doc.fileId}): {e}") logger.info(f"Debug: Stored message and documents in {message_path}") except Exception as e: logger.error(f"Debug: Failed to store message and documents: {e}") import traceback logger.error(f"Debug: Traceback: {traceback.format_exc()}") def getInterface(currentUser: Optional[User] = None) -> 'ChatObjects': """ Returns a ChatObjects instance for the current user. Handles initialization of database and records. """ if not currentUser: raise ValueError("Invalid user context: user is required") # Create context key contextKey = f"{currentUser.mandateId}_{currentUser.id}" # Create new instance if not exists if contextKey not in _chatInterfaces: _chatInterfaces[contextKey] = ChatObjects(currentUser) return _chatInterfaces[contextKey]