diff --git a/modules/services/serviceAi/subDocumentGeneration.py b/modules/services/serviceAi/subDocumentGeneration.py index 504ba154..21dc9cd6 100644 --- a/modules/services/serviceAi/subDocumentGeneration.py +++ b/modules/services/serviceAi/subDocumentGeneration.py @@ -677,7 +677,7 @@ Return only the JSON response. mimeType=file_info.get("mimeType", "application/json") if file_info else "application/json" ) - # Create message referencing the file + # Create message referencing the file - include document in initial call messageData = { "workflowId": workflow.id, "role": "assistant", @@ -688,22 +688,9 @@ Return only the JSON response. "documentsLabel": label, "documents": [] } - message = services.workflow.storeMessageWithDocuments(services.workflow.workflow, messageData, []) - if not message: - return - - # Persist ChatDocument with messageId - doc.messageId = message.id - services.interfaceDbChat.createDocument(doc.to_dict()) - - # Update message to include document - try: - if not message.documents: - message.documents = [] - message.documents.append(doc) - services.workflow.updateMessage(message.id, {"documents": [d.to_dict() for d in message.documents]}) - except Exception: - pass + + # Store message with document included from the start + services.workflow.storeMessageWithDocuments(services.workflow.workflow, messageData, [doc]) except Exception: # Non-fatal; ignore if storage or chat creation fails return diff --git a/modules/services/serviceWorkflow/mainServiceWorkflow.py b/modules/services/serviceWorkflow/mainServiceWorkflow.py index 0b2851e0..589971c6 100644 --- a/modules/services/serviceWorkflow/mainServiceWorkflow.py +++ b/modules/services/serviceWorkflow/mainServiceWorkflow.py @@ -2,7 +2,7 @@ import logging import uuid from typing import Dict, Any, List, Optional from modules.datamodels.datamodelUam import User, UserConnection -from modules.datamodels.datamodelChat import ChatDocument, ChatMessage, ChatStat +from modules.datamodels.datamodelChat import ChatDocument, ChatMessage, ChatStat, ChatLog from modules.datamodels.datamodelChat import ChatContentExtracted from modules.services.serviceExtraction.mainServiceExtraction import ExtractionService from modules.services.serviceGeneration.subDocumentUtility import getFileExtension, getMimeTypeFromExtension, detectContentTypeFromData @@ -498,7 +498,7 @@ class WorkflowService: # === Service-level transactions (DB write-through + in-memory sync) === - def storeMessageWithDocuments(self, workflow: Any, messageData: Dict[str, Any], documents: List[Any]): + def storeMessageWithDocuments(self, workflow: Any, messageData: Dict[str, Any], documents: List[Any]) -> ChatMessage: """Persist message and documents, then bind them into in-memory workflow (replace-by-id).""" # Ensure workflowId on message messageData = dict(messageData or {}) @@ -522,7 +522,7 @@ class WorkflowService: workflow.messages.append(chatMessage) return chatMessage - def storeLog(self, workflow: Any, logData: Dict[str, Any]) -> Any: + def storeLog(self, workflow: Any, logData: Dict[str, Any]) -> ChatLog: """Persist ChatLog and map it into the in-memory workflow logs list.""" logData = dict(logData or {}) logData["workflowId"] = workflow.id @@ -541,7 +541,7 @@ class WorkflowService: workflow.logs.append(chatLog) return chatLog - def storeWorkflowStat(self, workflow: Any, aiResponse: Any, process: str) -> Any: + def storeWorkflowStat(self, workflow: Any, aiResponse: Any, process: str) -> ChatStat: """Persist workflow-level ChatStat from AiCallResponse and append to workflow stats list.""" try: # Create ChatStat from AiCallResponse data diff --git a/modules/workflows/processing/core/actionExecutor.py b/modules/workflows/processing/core/actionExecutor.py index 1060e954..fd649337 100644 --- a/modules/workflows/processing/core/actionExecutor.py +++ b/modules/workflows/processing/core/actionExecutor.py @@ -159,7 +159,8 @@ class ActionExecutor: # Create database log entry for action failure (write-through + bind) self.services.workflow.storeLog(workflow, { "message": f"āŒ **Task {taskNum}**āŒ **Action {actionNum}/{totalActions}** failed: {result.error}", - "type": "error" + "type": "error", + "progress": 100 }) # Log action summary diff --git a/modules/workflows/processing/core/messageCreator.py b/modules/workflows/processing/core/messageCreator.py index 1bf75fb7..fe543bb3 100644 --- a/modules/workflows/processing/core/messageCreator.py +++ b/modules/workflows/processing/core/messageCreator.py @@ -67,7 +67,7 @@ class MessageCreator: "taskProgress": "pending" } - message = self.services.workflow.storeMessageWithDocuments(workflow, messageData, []) + self.services.workflow.storeMessageWithDocuments(workflow, messageData, []) logger.info("Task plan message created successfully") except Exception as e: logger.error(f"Error creating task plan message: {str(e)}") @@ -101,7 +101,7 @@ class MessageCreator: if taskStep.userMessage: taskStartMessage["message"] += f"\n\nšŸ’¬ {taskStep.userMessage}" - message = self.services.workflow.storeMessageWithDocuments(workflow, taskStartMessage, []) + self.services.workflow.storeMessageWithDocuments(workflow, taskStartMessage, []) logger.info(f"Task start message created for task {taskIndex}") except Exception as e: logger.error(f"Error creating task start message: {str(e)}") @@ -182,12 +182,10 @@ class MessageCreator: logger.info(f"Creating ERROR message: {messageText}") logger.info(f"Message data: {messageData}") - message = self.services.workflow.storeMessageWithDocuments(workflow, messageData, createdDocuments) + self.services.workflow.storeMessageWithDocuments(workflow, messageData, createdDocuments) logger.info(f"Message created: {action.execMethod}.{action.execAction}") - return message except Exception as e: logger.error(f"Error creating action message: {str(e)}") - return None async def createTaskCompletionMessage(self, taskStep: TaskStep, workflow: ChatWorkflow, taskIndex: int, totalTasks: int, reviewResult: ReviewResult): @@ -227,7 +225,7 @@ class MessageCreator: "taskProgress": "success" } - message = self.services.workflow.storeMessageWithDocuments(workflow, taskCompletionMessage, []) + self.services.workflow.storeMessageWithDocuments(workflow, taskCompletionMessage, []) logger.info(f"Task completion message created for task {taskIndex}") except Exception as e: logger.error(f"Error creating task completion message: {str(e)}") @@ -254,7 +252,7 @@ class MessageCreator: "taskProgress": "retry" } - message = self.services.workflow.storeMessageWithDocuments(workflow, retryMessage, []) + self.services.workflow.storeMessageWithDocuments(workflow, retryMessage, []) logger.info(f"Retry message created for task {taskIndex}") except Exception as e: logger.error(f"Error creating retry message: {str(e)}") @@ -293,7 +291,7 @@ class MessageCreator: "taskProgress": "fail" } - message = self.services.workflow.storeMessageWithDocuments(workflow, messageData, []) + self.services.workflow.storeMessageWithDocuments(workflow, messageData, []) logger.info(f"Error message created for task {taskIndex}") except Exception as e: logger.error(f"Error creating error message: {str(e)}") diff --git a/modules/workflows/processing/modes/modeReact.py b/modules/workflows/processing/modes/modeReact.py index 0503e56a..36ca90d9 100644 --- a/modules/workflows/processing/modes/modeReact.py +++ b/modules/workflows/processing/modes/modeReact.py @@ -147,12 +147,6 @@ class ReactMode(BaseMode): context.improvements = [] context.improvements.append(f"Step {step}: {decision.get('reason')}") - # Telemetry: simple duration per step - duration = time.time() - t0 - self.services.workflow.storeLog(workflow, { - "message": f"react_step_duration_sec={duration:.3f}", - "type": "info" - }) lastReviewDict = decision # Create user-friendly message AFTER action execution diff --git a/modules/workflows/workflowManager.py b/modules/workflows/workflowManager.py index b47bfb97..88899e49 100644 --- a/modules/workflows/workflowManager.py +++ b/modules/workflows/workflowManager.py @@ -142,10 +142,10 @@ class WorkflowManager: self.services.rawUserPrompt = userInput.prompt self.services.currentUserPrompt = userInput.prompt self.workflowProcessor = WorkflowProcessor(self.services, workflow) - message = await self._sendFirstMessage(userInput, workflow) + await self._sendFirstMessage(userInput, workflow) task_plan = await self._planTasks(userInput, workflow) await self._executeTasks(task_plan, workflow) - await self._processWorkflowResults(workflow, message) + await self._processWorkflowResults(workflow) except WorkflowStoppedException: self._handleWorkflowStop(workflow) @@ -155,7 +155,7 @@ class WorkflowManager: # Helper functions - async def _sendFirstMessage(self, userInput: UserInputRequest, workflow: ChatWorkflow) -> ChatMessage: + async def _sendFirstMessage(self, userInput: UserInputRequest, workflow: ChatWorkflow) -> None: """Send first message to start workflow""" try: self.workflowProcessor._checkWorkflowStopped(workflow) @@ -270,7 +270,7 @@ class WorkflowManager: except Exception: pass - # Create documents for context items (in-memory ChatDocument; persistence via storeMessageWithDocuments) + # Create documents for context items if contextItems and isinstance(contextItems, list): for idx, item in enumerate(contextItems): try: @@ -317,8 +317,7 @@ class WorkflowManager: logger.warning(f"Failed to process user fileIds: {e}") # Finally, persist and bind the first message with combined documents (context + user) - created_message = self.services.workflow.storeMessageWithDocuments(workflow, messageData, created_docs) - return created_message + self.services.workflow.storeMessageWithDocuments(workflow, messageData, created_docs) except Exception as e: logger.error(f"Error sending first message: {str(e)}") @@ -385,7 +384,7 @@ class WorkflowManager: workflow.status = "completed" return None - async def _processWorkflowResults(self, workflow: ChatWorkflow, initial_message: ChatMessage) -> None: + async def _processWorkflowResults(self, workflow: ChatWorkflow) -> None: """Process workflow results based on workflow status and create appropriate messages""" try: try: @@ -567,9 +566,7 @@ class WorkflowManager: } # Create message using interface - message = self.services.workflow.storeMessageWithDocuments(workflow, messageData, []) - if message: - self.services.workflow.storeMessageWithDocuments(workflow, message.__dict__, getattr(message, 'documents', [])) + self.services.workflow.storeMessageWithDocuments(workflow, messageData, []) # Update workflow status to completed workflow.status = "completed"