From 548762c4464040c3c8139a92767e24e724ac91e3 Mon Sep 17 00:00:00 2001 From: ValueOn AG Date: Sat, 30 Aug 2025 09:25:24 +0200 Subject: [PATCH] ui cleanup --- modules/chat/documents/documentGeneration.py | 2 +- modules/chat/handling/handlingTasks.py | 273 ++++++++++++++++--- modules/chat/handling/promptFactory.py | 2 +- modules/chat/serviceCenter.py | 22 +- modules/interfaces/interfaceChatModel.py | 28 +- modules/interfaces/interfaceChatObjects.py | 46 +++- modules/routes/routeWorkflows.py | 2 +- modules/workflow/managerWorkflow.py | 135 ++++++++- 8 files changed, 431 insertions(+), 79 deletions(-) diff --git a/modules/chat/documents/documentGeneration.py b/modules/chat/documents/documentGeneration.py index fac57ca0..5534462a 100644 --- a/modules/chat/documents/documentGeneration.py +++ b/modules/chat/documents/documentGeneration.py @@ -143,7 +143,7 @@ class DocumentGenerator: workflow_context = self.service.getWorkflowContext() workflow_stats = self.service.getWorkflowStats() - current_round = workflow_context.get('currentRound', 1) + current_round = workflow_context.get('currentRound', 0) current_task = workflow_context.get('currentTask', 0) current_action = workflow_context.get('currentAction', 0) diff --git a/modules/chat/handling/handlingTasks.py b/modules/chat/handling/handlingTasks.py index eb8b04f4..2d7882d5 100644 --- a/modules/chat/handling/handlingTasks.py +++ b/modules/chat/handling/handlingTasks.py @@ -65,14 +65,16 @@ class HandlingTasks: logger.info(f"User Input: {userInput}") available_docs = self.service.getAvailableDocuments(workflow) - # Set initial workflow context - increment round if this is a new workflow + # Set initial workflow context - handle new workflow vs continuation properly current_round = getattr(workflow, 'currentRound', 0) if current_round == 0: - # First time workflow, start with round 1 + # New workflow session, start with round 1 self.service.setWorkflowContext(round_number=1, task_number=0, action_number=0) else: - # Existing workflow, increment to next round - self.service.incrementWorkflowContext('round') + # This should not happen for a new workflow - reset to ensure clean state + logger.warning(f"Workflow has currentRound={current_round} but should be 0 for new workflow. Resetting...") + self.resetWorkflowForNewSession() + self.service.setWorkflowContext(round_number=1, task_number=0, action_number=0) # Check workflow status before calling AI service self._checkWorkflowStopped() @@ -209,7 +211,7 @@ class HandlingTasks: if total_tasks == 0: raise ValueError("Task plan contains no valid tasks") - self.service.setWorkflowTotals(total_tasks=total_tasks) + self.setWorkflowTotals(total_tasks=total_tasks) logger.info(f"Task plan generated successfully with {len(tasks)} tasks") logger.info(f"Workflow user language set to: {user_language}") @@ -255,15 +257,22 @@ class HandlingTasks: "publishedAt": get_utc_timestamp(), "documentsLabel": "task_plan", "documents": [], - # Add workflow context fields - "roundNumber": 1, # Task plan is always round 1 - "taskNumber": 0, # Task plan is before individual tasks - "actionNumber": 0 + # Add workflow context fields - use current workflow round instead of hardcoded 1 + "roundNumber": workflow.currentRound, # Use current workflow round + "taskNumber": 1, # Task plan is before individual tasks; to keep 1, that UI not filtering the message + "actionNumber": 0, + # Add task progress status + "taskProgress": "pending" } message = self.chatInterface.createWorkflowMessage(message_data) if message: workflow.messages.append(message) + + # PHASE 4: Update workflow object after task plan created + # Set currentTask=1, currentAction=0, totalTasks=len(task_plan.tasks), totalActions=0 + self.updateWorkflowAfterTaskPlanCreated(len(task_plan.tasks)) + logger.info(f"Task plan message created with {len(task_plan.tasks)} tasks") else: logger.error("Failed to create task plan message") @@ -283,7 +292,7 @@ class HandlingTasks: message_text += f"**Total Documents:** {len(documents)}\n\n" # Add workflow context information - current_round = workflow_context.get('currentRound', 1) + current_round = workflow_context.get('currentRound', 0) current_task = workflow_context.get('currentTask', 0) total_tasks = workflow_stats.get('totalTasks', 0) current_action = workflow_context.get('currentAction', 0) @@ -323,9 +332,12 @@ class HandlingTasks: "documentsLabel": "document_context", "documents": [], # Empty documents for context message # Add workflow context fields - "roundNumber": workflow_context.get('currentRound', 1), + "roundNumber": workflow_context.get('currentRound', 0), "taskNumber": workflow_context.get('currentTask', 0), - "actionNumber": workflow_context.get('currentAction', 0) + "actionNumber": workflow_context.get('currentAction', 0), + # Add progress status + "taskProgress": "pending", + "actionProgress": "pending" } message = self.chatInterface.createWorkflowMessage(message_data) @@ -533,6 +545,11 @@ class HandlingTasks: """Execute all actions for a task step, with state management and retries.""" logger.info(f"=== STARTING TASK {task_index or '?'}: {task_step.objective} ===") + # PHASE 4: Update workflow object before executing task + # Set currentTask=task_number, currentAction=0, totalActions=0 + if task_index is not None: + self.updateWorkflowBeforeExecutingTask(task_index) + # Update workflow context for this task if task_index is not None: self.service.setWorkflowContext(task_number=task_index) @@ -565,9 +582,11 @@ class HandlingTasks: "documentsLabel": f"task_{task_index}_start", "documents": [], # Add workflow context fields - "roundNumber": 1, # Task start is always round 1 + "roundNumber": workflow.currentRound, # Use current workflow round "taskNumber": task_index, - "actionNumber": 0 + "actionNumber": 0, + # Add task progress status + "taskProgress": "running" } # Add user-friendly message if available @@ -593,17 +612,22 @@ class HandlingTasks: retry_context.retry_count = attempt + 1 actions = await self.generateTaskActions(task_step, workflow, previous_results=retry_context.previous_results, enhanced_context=retry_context) + + # Log total actions count for this task + total_actions = len(actions) if actions else 0 + logger.info(f"Task {task_index or '?'} has {total_actions} actions") + + # PHASE 4: Update workflow object after action planning + # Set totalActions=extracted_total_actions for THIS task + self.updateWorkflowAfterActionPlanning(total_actions) + + # Set workflow action total for this task (0 if no actions generated) + self.setWorkflowTotals(total_actions=total_actions) + if not actions: logger.error("No actions defined for task step, aborting task execution") break - # Log total actions count for this task - total_actions = len(actions) - logger.info(f"Task {task_index or '?'} has {total_actions} actions") - - # Set workflow action total for this task - self.service.setWorkflowTotals(total_actions=total_actions) - # Create document context message if documents are available available_docs = self.service.getAvailableDocuments(workflow) if available_docs: @@ -614,8 +638,12 @@ class HandlingTasks: # Check workflow status before each action execution self._checkWorkflowStopped() - # Update workflow context for this action + # PHASE 4: Update workflow object before executing action + # Set currentAction=action_number action_number = action_idx + 1 + self.updateWorkflowBeforeExecutingAction(action_number) + + # Update workflow context for this action self.service.setWorkflowContext(action_number=action_number) # Remove the increment call that causes double-increment bug @@ -638,16 +666,18 @@ class HandlingTasks: "sequenceNr": len(workflow.messages) + 1, "publishedAt": get_utc_timestamp(), "documentsLabel": f"action_{action_number}_start", - "documents": [] + "documents": [], + # Add action progress status + "actionProgress": "running" } # Add user-friendly message if available if action.userMessage: action_start_message["message"] += f"\n\n💬 {action.userMessage}" - # Add workflow context fields + # Add workflow context fields - use current workflow round instead of hardcoded 1 action_start_message.update({ - "roundNumber": 1, # Action start is always round 1 + "roundNumber": workflow.currentRound, # Use current workflow round "taskNumber": task_index, "actionNumber": action_number }) @@ -714,9 +744,11 @@ class HandlingTasks: "documentsLabel": f"task_{task_index}_completion", "documents": [], # Add workflow context fields - "roundNumber": 1, # Task completion is always round 1 + "roundNumber": workflow.currentRound, # Use current workflow round "taskNumber": task_index, - "actionNumber": 0 + "actionNumber": 0, + # Add task progress status + "taskProgress": "success" } # Add user-friendly message if available @@ -799,6 +831,26 @@ class HandlingTasks: logger.info(f"Reason: {review_result.reason}") logger.info("=== END RETRY SUMMARY ===") + # Create retry message for user + retry_message = { + "workflowId": workflow.id, + "role": "assistant", + "message": f"🔄 Task {task_index} requires retry: {review_result.improvements}", + "status": "step", + "sequenceNr": len(workflow.messages) + 1, + "publishedAt": get_utc_timestamp(), + "documentsLabel": f"task_{task_index}_retry", + "documents": [], + "roundNumber": workflow.currentRound, + "taskNumber": task_index, + "actionNumber": 0, + "taskProgress": "retry" + } + + message = self.chatInterface.createWorkflowMessage(retry_message) + if message: + workflow.messages.append(message) + continue else: logger.error(f"=== TASK {task_index or '?'} FAILED: {task_step.objective} after {attempt+1} attempts ===") @@ -840,9 +892,11 @@ class HandlingTasks: "documentsLabel": None, "documents": [], # Add workflow context fields - "roundNumber": 1, # Task retry is always round 1 + "roundNumber": workflow.currentRound, # Use current workflow round "taskNumber": task_index, - "actionNumber": 0 + "actionNumber": 0, + # Add task progress status + "taskProgress": "retry" } try: @@ -893,9 +947,11 @@ class HandlingTasks: "documentsLabel": None, "documents": [], # Add workflow context fields - "roundNumber": 1, # Task failure is always round 1 + "roundNumber": workflow.currentRound, # Use current workflow round "taskNumber": task_index, - "actionNumber": 0 + "actionNumber": 0, + # NEW: Add task progress status + "taskProgress": "fail" } try: @@ -1306,7 +1362,7 @@ class HandlingTasks: message_text += f"**Result Label:** {result_label}\n" # Add comprehensive workflow context - current_round = workflow_context.get('currentRound', 1) + current_round = workflow_context.get('currentRound', 0) current_task = workflow_context.get('currentTask', 0) total_tasks = workflow_stats.get('totalTasks', 0) current_action = workflow_context.get('currentAction', 0) @@ -1330,7 +1386,7 @@ class HandlingTasks: message_text += f"**Result Label:** {result_label}\n" # Add comprehensive workflow context - current_round = workflow_context.get('currentRound', 1) + current_round = workflow_context.get('currentRound', 0) current_task = workflow_context.get('currentTask', 0) total_tasks = workflow_stats.get('totalTasks', 0) current_action = workflow_context.get('currentAction', 0) @@ -1346,7 +1402,7 @@ class HandlingTasks: message_text += f"- Action: {current_action}/{total_actions}\n" else: message_text += f"- Action: {current_action}\n" - message_text += f"- Status: {workflow_stats.get('workflowStatus', 'unknown')}" + message_text += f"- Status: {workflow_stats.get('workflowStats', 'unknown')}" else: # ⚠️ FAILURE MESSAGE - Show error details to user error_details = result.error if result.error else "Unknown error occurred" @@ -1356,7 +1412,7 @@ class HandlingTasks: message_text += f"**Result Label:** {result_label}\n" # Add comprehensive workflow context - current_round = workflow_context.get('currentRound', 1) + current_round = workflow_context.get('currentRound', 0) current_task = workflow_context.get('currentTask', 0) total_tasks = workflow_stats.get('totalTasks', 0) current_action = workflow_context.get('currentAction', 0) @@ -1370,7 +1426,6 @@ class HandlingTasks: message_text += f"- Task: {current_task}\n" if total_actions > 0: message_text += f"- Action: {current_action}/{total_actions}\n" - else: message_text += f"- Action: {current_action}\n" message_text += f"- Status: {workflow_stats.get('workflowStatus', 'unknown')}\n\n" message_text += "Please check the connection and try again." @@ -1388,9 +1443,10 @@ class HandlingTasks: "documentsLabel": result_label, "documents": created_documents, # Add workflow context fields - extract from result_label to match document reference - "roundNumber": workflow_context.get('currentRound', 1), + "roundNumber": workflow_context.get('currentRound', 0), "taskNumber": task_index, - "actionNumber": self._extractActionNumberFromLabel(result_label) if result_label else workflow_context.get('currentAction', 0) + "actionNumber": self._extractActionNumberFromLabel(result_label) if result_label else workflow_context.get('currentAction', 0), + "actionProgress": "success" if result.success else "fail" } # Add user-friendly message if available @@ -1538,4 +1594,143 @@ class HandlingTasks: return first_doc.documentData.get("result", "") elif isinstance(first_doc.documentData, str): return first_doc.documentData - return "" \ No newline at end of file + return "" + + # PHASE 4: Workflow Object Update Rules Implementation + + def updateWorkflowAfterTaskPlanCreated(self, total_tasks: int): + """ + Update workflow object after task plan is created. + Rule: Set currentTask=1, currentAction=0, totalTasks=extracted_total_tasks, totalActions=0 + """ + try: + update_data = { + "currentTask": 1, + "currentAction": 0, + "totalTasks": total_tasks, + "totalActions": 0 + } + + # Update workflow object + self.workflow.currentTask = 1 + self.workflow.currentAction = 0 + self.workflow.totalTasks = total_tasks + self.workflow.totalActions = 0 + + # Update in database + self.chatInterface.updateWorkflow(self.workflow.id, update_data) + logger.info(f"Updated workflow {self.workflow.id} after task plan created: {update_data}") + + except Exception as e: + logger.error(f"Error updating workflow after task plan created: {str(e)}") + + def updateWorkflowBeforeExecutingTask(self, task_number: int): + """ + Update workflow object before executing a task. + Rule: Set currentTask=task_number, currentAction=0, totalActions=0 + """ + try: + update_data = { + "currentTask": task_number, + "currentAction": 0, + "totalActions": 0 + } + + # Update workflow object + self.workflow.currentTask = task_number + self.workflow.currentAction = 0 + self.workflow.totalActions = 0 + + # Update in database + self.chatInterface.updateWorkflow(self.workflow.id, update_data) + logger.info(f"Updated workflow {self.workflow.id} before executing task {task_number}: {update_data}") + + except Exception as e: + logger.error(f"Error updating workflow before executing task: {str(e)}") + + def updateWorkflowAfterActionPlanning(self, total_actions: int): + """ + Update workflow object after action planning for current task. + Rule: Set totalActions=extracted_total_actions for THIS task + """ + try: + update_data = { + "totalActions": total_actions + } + + # Update workflow object + self.workflow.totalActions = total_actions + + # Update in database + self.chatInterface.updateWorkflow(self.workflow.id, update_data) + logger.info(f"Updated workflow {self.workflow.id} after action planning: {update_data}") + + except Exception as e: + logger.error(f"Error updating workflow after action planning: {str(e)}") + + def updateWorkflowBeforeExecutingAction(self, action_number: int): + """ + Update workflow object before executing an action. + Rule: Set currentAction=action_number + """ + try: + update_data = { + "currentAction": action_number + } + + # Update workflow object + self.workflow.currentAction = action_number + + # Update in database + self.chatInterface.updateWorkflow(self.workflow.id, update_data) + logger.info(f"Updated workflow {self.workflow.id} before executing action {action_number}: {update_data}") + + except Exception as e: + logger.error(f"Error updating workflow before executing action: {str(e)}") + + def setWorkflowTotals(self, total_tasks: int = None, total_actions: int = None): + """Set total counts for workflow progress tracking and update database""" + try: + update_data = {} + + if total_tasks is not None: + self.workflow.totalTasks = total_tasks + update_data["totalTasks"] = total_tasks + + if total_actions is not None: + self.workflow.totalActions = total_actions + update_data["totalActions"] = total_actions + + # Update workflow object in database if we have changes + if update_data: + self.chatInterface.updateWorkflow(self.workflow.id, update_data) + logger.info(f"Updated workflow {self.workflow.id} totals in database: {update_data}") + + logger.debug(f"Updated workflow totals: Tasks {self.workflow.totalTasks if hasattr(self.workflow, 'totalTasks') else 'N/A'}, Actions {self.workflow.totalActions if hasattr(self.workflow, 'totalActions') else 'N/A'}") + except Exception as e: + logger.error(f"Error setting workflow totals: {str(e)}") + + def resetWorkflowForNewSession(self): + """Reset workflow values for a new workflow session""" + try: + # Reset all workflow progress values to initial state + self.workflow.currentRound = 0 + self.workflow.currentTask = 0 + self.workflow.currentAction = 0 + self.workflow.totalTasks = 0 + self.workflow.totalActions = 0 + self.workflow.status = 'ready' + + # Update workflow object in database with reset values + self.chatInterface.updateWorkflow(self.workflow.id, { + "currentRound": 0, + "currentTask": 0, + "currentAction": 0, + "totalTasks": 0, + "totalActions": 0, + "status": "ready" + }) + + logger.info("Workflow reset for new session - all values set to initial state and updated in database") + except Exception as e: + logger.error(f"Error resetting workflow for new session: {str(e)}") \ No newline at end of file diff --git a/modules/chat/handling/promptFactory.py b/modules/chat/handling/promptFactory.py index 9ff277c5..884606e4 100644 --- a/modules/chat/handling/promptFactory.py +++ b/modules/chat/handling/promptFactory.py @@ -179,7 +179,7 @@ Previous review feedback: # Get current workflow context for dynamic examples workflow_context = service.getWorkflowContext() - current_round = workflow_context.get('currentRound', 1) + current_round = workflow_context.get('currentRound', 0) current_task = workflow_context.get('currentTask', 1) prompt = f""" diff --git a/modules/chat/serviceCenter.py b/modules/chat/serviceCenter.py index d1524196..542d4a20 100644 --- a/modules/chat/serviceCenter.py +++ b/modules/chat/serviceCenter.py @@ -1134,19 +1134,19 @@ Please provide a comprehensive summary of this conversation.""" """Get current workflow context for document generation""" try: return { - 'currentRound': self.workflow.currentRound if hasattr(self.workflow, 'currentRound') else 1, + 'currentRound': self.workflow.currentRound if hasattr(self.workflow, 'currentRound') else 0, 'currentTask': self.workflow.currentTask if hasattr(self.workflow, 'currentTask') else 0, 'currentAction': self.workflow.currentAction if hasattr(self.workflow, 'currentAction') else 0 } except Exception as e: logger.error(f"Error getting workflow context: {str(e)}") - return {'currentRound': 1, 'currentTask': 0, 'currentAction': 0} + return {'currentRound': 0, 'currentTask': 0, 'currentAction': 0} def incrementWorkflowContext(self, context_type: str): """Increment workflow context counters""" try: if context_type == 'round': - current_round = self.workflow.currentRound if hasattr(self.workflow, 'currentRound') else 1 + current_round = self.workflow.currentRound if hasattr(self.workflow, 'currentRound') else 0 self.workflow.currentRound = current_round + 1 # Reset task and action when round changes self.workflow.currentTask = 0 @@ -1183,7 +1183,7 @@ Please provide a comprehensive summary of this conversation.""" except Exception as e: logger.error(f"Error getting workflow stats: {str(e)}") return { - 'currentRound': 1, + 'currentRound': 0, 'currentTask': 0, 'currentAction': 0, 'totalTasks': 0, @@ -1206,17 +1206,9 @@ Please provide a comprehensive summary of this conversation.""" except Exception as e: logger.error(f"Error refreshing file attributes for document {doc.id}: {e}") - def setWorkflowTotals(self, total_tasks: int = None, total_actions: int = None): - """Set total counts for workflow progress tracking""" - try: - if total_tasks is not None: - self.workflow.totalTasks = total_tasks - if total_actions is not None: - self.workflow.totalActions = total_actions - - logger.debug(f"Updated workflow totals: Tasks {self.workflow.totalTasks if hasattr(self.workflow, 'totalTasks') else 'N/A'}, Actions {self.workflow.totalActions if hasattr(self.workflow, 'totalActions') else 'N/A'}") - except Exception as e: - logger.error(f"Error setting workflow totals: {str(e)}") + # Note: Workflow progress update methods have been moved to handlingTasks.py + # where they belong since that's where the actual workflow execution happens + # This avoids circular import issues between ServiceCenter and ChatInterface def diagnoseDocumentAccess(self, document: ChatDocument) -> Dict[str, Any]: """ diff --git a/modules/interfaces/interfaceChatModel.py b/modules/interfaces/interfaceChatModel.py index 66e86624..bc751f4f 100644 --- a/modules/interfaces/interfaceChatModel.py +++ b/modules/interfaces/interfaceChatModel.py @@ -483,6 +483,30 @@ class ChatMessage(BaseModel, ModelMixin): roundNumber: Optional[int] = Field(None, description="Round number in workflow") taskNumber: Optional[int] = Field(None, description="Task number within round") actionNumber: Optional[int] = Field(None, description="Action number within task") + + # New workflow progress fields: + taskProgress: Optional[str] = Field( + None, + description="Task progress status: pending, running, success, fail, retry", + frontend_options=[ + {"value": "pending", "label": {"en": "Pending", "fr": "En attente"}}, + {"value": "running", "label": {"en": "Running", "fr": "En cours"}}, + {"value": "success", "label": {"en": "Success", "fr": "Succès"}}, + {"value": "fail", "label": {"en": "Failed", "fr": "Échec"}}, + {"value": "retry", "label": {"en": "Retry", "fr": "Nouvel essai"}} + ] + ) + + actionProgress: Optional[str] = Field( + None, + description="Action progress status: pending, running, success, fail", + frontend_options=[ + {"value": "pending", "label": {"en": "Pending", "fr": "En attente"}}, + {"value": "running", "label": {"en": "Running", "fr": "En cours"}}, + {"value": "success", "label": {"en": "Success", "fr": "Succès"}}, + {"value": "fail", "label": {"en": "Failed", "fr": "Échec"}} + ] + ) # Register labels for ChatMessage register_model_labels( @@ -506,7 +530,9 @@ register_model_labels( "actionName": {"en": "Action Name", "fr": "Nom de l'action"}, "roundNumber": {"en": "Round Number", "fr": "Numéro de tour"}, "taskNumber": {"en": "Task Number", "fr": "Numéro de tâche"}, - "actionNumber": {"en": "Action Number", "fr": "Numéro d'action"} + "actionNumber": {"en": "Action Number", "fr": "Numéro d'action"}, + "taskProgress": {"en": "Task Progress", "fr": "Progression de la tâche"}, + "actionProgress": {"en": "Action Progress", "fr": "Progression de l'action"} } ) diff --git a/modules/interfaces/interfaceChatObjects.py b/modules/interfaces/interfaceChatObjects.py index c73f2622..e98565cd 100644 --- a/modules/interfaces/interfaceChatObjects.py +++ b/modules/interfaces/interfaceChatObjects.py @@ -157,7 +157,7 @@ class ChatObjects: id=workflow["id"], status=workflow.get("status", "running"), name=workflow.get("name"), - currentRound=workflow.get("currentRound", 1), + currentRound=workflow.get("currentRound", 0), # Fixed: Default to 0 for new workflows currentTask=workflow.get("currentTask", 0), currentAction=workflow.get("currentAction", 0), totalTasks=workflow.get("totalTasks", 0), @@ -202,7 +202,7 @@ class ChatObjects: id=created["id"], status=created.get("status", "running"), name=created.get("name"), - currentRound=created.get("currentRound", 1), + currentRound=created.get("currentRound", 0), # Fixed: Default to 0 for new workflows currentTask=created.get("currentTask", 0), currentAction=created.get("currentAction", 0), totalTasks=created.get("totalTasks", 0), @@ -352,6 +352,20 @@ class ChatObjects: if "agentName" not in messageData: messageData["agentName"] = "" + # CRITICAL FIX: Automatically set roundNumber, taskNumber, and actionNumber if not provided + # This ensures messages have the correct progress context when workflows are continued + if "roundNumber" not in messageData: + messageData["roundNumber"] = workflow.currentRound + logger.debug(f"Auto-setting roundNumber to {workflow.currentRound} for message {messageData['id']}") + + if "taskNumber" not in messageData: + messageData["taskNumber"] = workflow.currentTask + logger.debug(f"Auto-setting taskNumber to {workflow.currentTask} for message {messageData['id']}") + + if "actionNumber" not in messageData: + messageData["actionNumber"] = workflow.currentAction + logger.debug(f"Auto-setting actionNumber to {workflow.currentAction} for message {messageData['id']}") + # Convert ChatDocument objects to dictionaries for database storage if "documents" in messageData and messageData["documents"]: documents_for_db = [] @@ -383,7 +397,15 @@ class ChatObjects: status=createdMessage.get("status", "step"), sequenceNr=len(workflow.messages) + 1, # Use messages list length for sequence number publishedAt=createdMessage.get("publishedAt", get_utc_timestamp()), - stats=ChatStat(**createdMessage.get("stats", {})) if createdMessage.get("stats") else None + stats=ChatStat(**createdMessage.get("stats", {})) if createdMessage.get("stats") else None, + # CRITICAL FIX: Include the progress fields in the ChatMessage object + roundNumber=createdMessage.get("roundNumber"), + taskNumber=createdMessage.get("taskNumber"), + actionNumber=createdMessage.get("actionNumber"), + success=createdMessage.get("success"), + actionId=createdMessage.get("actionId"), + actionMethod=createdMessage.get("actionMethod"), + actionName=createdMessage.get("actionName") ) except Exception as e: @@ -861,14 +883,16 @@ class ChatObjects: await asyncio.sleep(0.1) # Update workflow - set status back to running for resumed workflows + newRound = workflow.currentRound + 1 self.updateWorkflow(workflowId, { "status": "running", # Set status back to running for resumed workflows "lastActivity": currentTime, - "currentRound": workflow.currentRound + 1 + "currentRound": newRound }) - # Update the workflow object status as well + # Update the workflow object status and round number as well workflow.status = "running" + workflow.currentRound = newRound # Add log entry for workflow resumption self.createWorkflowLog({ @@ -886,7 +910,7 @@ class ChatObjects: "status": "running", "startedAt": currentTime, "lastActivity": currentTime, - "currentRound": 1, + "currentRound": 0, # Fixed: Start with 0, will be set to 1 when workflow starts "mandateId": self.mandateId, "messageIds": [], "dataStats": { @@ -899,6 +923,13 @@ class ChatObjects: # Create workflow workflow = self.createWorkflow(workflowData) + # Ensure workflow is in clean state for new session + if hasattr(workflow, 'currentRound') and workflow.currentRound != 0: + logger.warning(f"New workflow has currentRound={workflow.currentRound}, resetting to 0") + workflow.currentRound = 0 + workflow.currentTask = 0 + workflow.currentAction = 0 + # Initialize stats for the new workflow self.updateWorkflowStats(workflow.id, bytesSent=0, bytesReceived=0) @@ -907,6 +938,9 @@ class ChatObjects: # Start workflow processing from modules.workflow.managerWorkflow import WorkflowManager workflowManager = WorkflowManager(self, currentUser) + + # Start the workflow processing asynchronously + # The workflow will be updated with progress data during execution asyncio.create_task(workflowManager.workflowProcess(userInput, workflow)) return workflow diff --git a/modules/routes/routeWorkflows.py b/modules/routes/routeWorkflows.py index 86d9b3e2..d69ca14f 100644 --- a/modules/routes/routeWorkflows.py +++ b/modules/routes/routeWorkflows.py @@ -67,7 +67,7 @@ async def get_workflows( id=workflow_data["id"], status=workflow_data.get("status", "running"), name=workflow_data.get("name"), - currentRound=workflow_data.get("currentRound", 1), + currentRound=workflow_data.get("currentRound", 0), # Fixed: Default to 0 for new workflows currentTask=workflow_data.get("currentTask", 0), currentAction=workflow_data.get("currentAction", 0), totalTasks=workflow_data.get("totalTasks", 0), diff --git a/modules/workflow/managerWorkflow.py b/modules/workflow/managerWorkflow.py index abadf7c4..b30b9c4b 100644 --- a/modules/workflow/managerWorkflow.py +++ b/modules/workflow/managerWorkflow.py @@ -41,8 +41,10 @@ class WorkflowManager: # Process workflow results await self._processWorkflowResults(workflow, workflow_result, message) - # Send last message - await self._sendLastMessage(workflow) + # Only send last message for successful workflows + # Stopped/failed workflows get their final messages in _processWorkflowResults + if workflow_result.status == 'success': + await self._sendLastMessage(workflow) except WorkflowStoppedException: logger.info("Workflow stopped by user") @@ -65,7 +67,14 @@ class WorkflowManager: "sequenceNr": len(workflow.messages) + 1, "publishedAt": get_utc_timestamp(), "documentsLabel": "workflow_stopped", - "documents": [] + "documents": [], + # Add workflow context fields + "roundNumber": workflow.currentRound, + "taskNumber": 0, + "actionNumber": 0, + # Add progress status + "taskProgress": "pending", + "actionProgress": "pending" } message = self.chatInterface.createWorkflowMessage(stopped_message) if message: @@ -100,7 +109,16 @@ class WorkflowManager: "message": f"Workflow processing failed: {str(e)}", "status": "last", "sequenceNr": len(workflow.messages) + 1, - "publishedAt": get_utc_timestamp() + "publishedAt": get_utc_timestamp(), + "documentsLabel": "workflow_error", + "documents": [], + # Add workflow context fields + "roundNumber": workflow.currentRound, + "taskNumber": 0, + "actionNumber": 0, + # Add progress status + "taskProgress": "fail", + "actionProgress": "fail" } message = self.chatInterface.createWorkflowMessage(error_message) if message: @@ -129,7 +147,16 @@ class WorkflowManager: "message": userInput.prompt, "status": "first", "sequenceNr": 1, - "publishedAt": get_utc_timestamp() + "publishedAt": get_utc_timestamp(), + "documentsLabel": "workflow_start", + "documents": [], + # Add workflow context fields + "roundNumber": workflow.currentRound, + "taskNumber": 0, + "actionNumber": 0, + # Add progress status + "taskProgress": "pending", + "actionProgress": "pending" } # Add documents if any @@ -178,15 +205,16 @@ class WorkflowManager: return "Workflow processing completed." async def _sendLastMessage(self, workflow: ChatWorkflow) -> None: - """Send last message to complete workflow""" + """Send last message to complete workflow (only for successful workflows)""" try: - self.chatManager.handlingTasks._checkWorkflowStopped() + # Safety check: ensure this is only called for successful workflows + if workflow.status in ['stopped', 'failed']: + logger.warning(f"Attempted to send last message for {workflow.status} workflow {workflow.id}") + return # Generate feedback feedback = await self._generateWorkflowFeedback(workflow) - self.chatManager.handlingTasks._checkWorkflowStopped() - # Create last message using interface messageData = { "workflowId": workflow.id, @@ -194,7 +222,16 @@ class WorkflowManager: "message": feedback, "status": "last", "sequenceNr": len(workflow.messages) + 1, - "publishedAt": get_utc_timestamp() + "publishedAt": get_utc_timestamp(), + "documentsLabel": "workflow_feedback", + "documents": [], + # Add workflow context fields + "roundNumber": workflow.currentRound, + "taskNumber": 0, + "actionNumber": 0, + # Add progress status + "taskProgress": "success", + "actionProgress": "success" } # Create message using interface @@ -242,7 +279,14 @@ class WorkflowManager: "sequenceNr": len(workflow.messages) + 1, "publishedAt": get_utc_timestamp(), "documentsLabel": "workflow_stopped", - "documents": [] + "documents": [], + # Add workflow context fields + "roundNumber": workflow.currentRound, + "taskNumber": 0, + "actionNumber": 0, + # Add progress status + "taskProgress": "stopped", + "actionProgress": "stopped" } message = self.chatInterface.createWorkflowMessage(stopped_message) if message: @@ -267,7 +311,14 @@ class WorkflowManager: "sequenceNr": len(workflow.messages) + 1, "publishedAt": get_utc_timestamp(), "documentsLabel": "workflow_stopped", - "documents": [] + "documents": [], + # Add workflow context fields + "roundNumber": workflow.currentRound, + "taskNumber": 0, + "actionNumber": 0, + # Add progress status + "taskProgress": "stopped", + "actionProgress": "stopped" } message = self.chatInterface.createWorkflowMessage(stopped_message) if message: @@ -282,6 +333,15 @@ class WorkflowManager: "totalTasks": workflow.totalTasks, "totalActions": workflow.totalActions }) + + # Add stopped log entry + self.chatInterface.createWorkflowLog({ + "workflowId": workflow.id, + "message": "Workflow stopped by user", + "type": "warning", + "status": "stopped", + "progress": 100 + }) return elif workflow_result.status == 'failed': # Create error message @@ -291,7 +351,16 @@ class WorkflowManager: "message": f"Workflow failed: {workflow_result.error or 'Unknown error'}", "status": "last", "sequenceNr": len(workflow.messages) + 1, - "publishedAt": get_utc_timestamp() + "publishedAt": get_utc_timestamp(), + "documentsLabel": "workflow_failure", + "documents": [], + # Add workflow context fields + "roundNumber": workflow.currentRound, + "taskNumber": 0, + "actionNumber": 0, + # Add progress status + "taskProgress": "fail", + "actionProgress": "fail" } message = self.chatInterface.createWorkflowMessage(error_message) if message: @@ -306,6 +375,15 @@ class WorkflowManager: "totalTasks": workflow.totalTasks, "totalActions": workflow.totalActions }) + + # Add failed log entry + self.chatInterface.createWorkflowLog({ + "workflowId": workflow.id, + "message": f"Workflow failed: {workflow_result.error or 'Unknown error'}", + "type": "error", + "status": "failed", + "progress": 100 + }) return # For successful workflows, create a simple completion message @@ -315,7 +393,16 @@ class WorkflowManager: "message": f"Workflow completed successfully. Completed {workflow_result.completed_tasks}/{workflow_result.total_tasks} tasks in {workflow_result.execution_time:.2f} seconds.", "status": "last", "sequenceNr": len(workflow.messages) + 1, - "publishedAt": get_utc_timestamp() + "publishedAt": get_utc_timestamp(), + "documentsLabel": "workflow_completion", + "documents": [], + # Add workflow context fields + "roundNumber": workflow.currentRound, + "taskNumber": 0, + "actionNumber": 0, + # Add progress status + "taskProgress": "success", + "actionProgress": "success" } message = self.chatInterface.createWorkflowMessage(summary_message) @@ -332,6 +419,15 @@ class WorkflowManager: "totalActions": workflow.totalActions }) + # Add completion log entry + self.chatInterface.createWorkflowLog({ + "workflowId": workflow.id, + "message": "Workflow completed successfully", + "type": "success", + "status": "completed", + "progress": 100 + }) + except Exception as e: logger.error(f"Error processing workflow results: {str(e)}") # Create error message @@ -341,7 +437,16 @@ class WorkflowManager: "message": f"Error processing workflow results: {str(e)}", "status": "last", "sequenceNr": len(workflow.messages) + 1, - "publishedAt": get_utc_timestamp() + "publishedAt": get_utc_timestamp(), + "documentsLabel": "workflow_error", + "documents": [], + # Add workflow context fields + "roundNumber": workflow.currentRound, + "taskNumber": 0, + "actionNumber": 0, + # Add progress status + "taskProgress": "fail", + "actionProgress": "fail" } message = self.chatInterface.createWorkflowMessage(error_message) if message: