diff --git a/modules/features/syncDelta/mainSyncDelta.py b/modules/features/syncDelta/mainSyncDelta.py index 2d6a2b1b..32f29922 100644 --- a/modules/features/syncDelta/mainSyncDelta.py +++ b/modules/features/syncDelta/mainSyncDelta.py @@ -83,7 +83,7 @@ class ManagerSyncDelta: self.APP_ENV_TYPE = self.services.utils.configGet("APP_ENV_TYPE", "dev") self.JIRA_API_TOKEN = self.services.utils.configGet("Feature_SyncDelta_JIRA_DELTA_TOKEN_SECRET", "") # Resolve SharePoint connection for the configured user id - self.sharepointConnection = self.services.workflow.getUserConnectionByExternalUsername("msft", self.SHAREPOINT_USER_ID) + self.sharepointConnection = self.services.chat.getUserConnectionByExternalUsername("msft", self.SHAREPOINT_USER_ID) if not self.sharepointConnection: logger.error( f"No SharePoint connection found for user: {self.SHAREPOINT_USER_ID}" diff --git a/modules/services/__init__.py b/modules/services/__init__.py index 87b13207..3e33d208 100644 --- a/modules/services/__init__.py +++ b/modules/services/__init__.py @@ -75,8 +75,8 @@ class Services: from .serviceTicket.mainServiceTicket import TicketService self.ticket = PublicService(TicketService(self)) - from .serviceWorkflow.mainServiceWorkflow import WorkflowService - self.workflow = PublicService(WorkflowService(self)) + from .serviceChat.mainServiceChat import ChatService + self.chat = PublicService(ChatService(self)) from .serviceUtils.mainServiceUtils import UtilsService self.utils = PublicService(UtilsService(self)) diff --git a/modules/services/serviceAi/mainServiceAi.py b/modules/services/serviceAi/mainServiceAi.py index 2cac2905..95c51779 100644 --- a/modules/services/serviceAi/mainServiceAi.py +++ b/modules/services/serviceAi/mainServiceAi.py @@ -205,11 +205,11 @@ Respond with ONLY a JSON object in this exact format: # Update progress for iteration start if operationId: if iteration == 1: - self.services.workflow.progressLogUpdate(operationId, 0.5, f"Starting AI call iteration {iteration}") + self.services.chat.progressLogUpdate(operationId, 0.5, f"Starting AI call iteration {iteration}") else: # For continuation iterations, show progress incrementally baseProgress = 0.5 + (min(iteration - 1, maxIterations) / maxIterations * 0.4) # Progress from 0.5 to 0.9 over maxIterations iterations - self.services.workflow.progressLogUpdate(operationId, baseProgress, f"Continuing generation (iteration {iteration})") + self.services.chat.progressLogUpdate(operationId, baseProgress, f"Continuing generation (iteration {iteration})") # Build iteration prompt if len(allSections) > 0 and promptBuilder and promptArgs: @@ -227,7 +227,7 @@ Respond with ONLY a JSON object in this exact format: # Make AI call try: if operationId and iteration == 1: - self.services.workflow.progressLogUpdate(operationId, 0.51, "Calling AI model") + self.services.chat.progressLogUpdate(operationId, 0.51, "Calling AI model") request = AiCallRequest( prompt=iterationPrompt, context="", @@ -246,10 +246,10 @@ Respond with ONLY a JSON object in this exact format: # Update progress after AI call if operationId: if iteration == 1: - self.services.workflow.progressLogUpdate(operationId, 0.6, f"AI response received (iteration {iteration})") + self.services.chat.progressLogUpdate(operationId, 0.6, f"AI response received (iteration {iteration})") else: progress = 0.6 + (min(iteration - 1, 10) * 0.03) - self.services.workflow.progressLogUpdate(operationId, progress, f"Processing response (iteration {iteration})") + self.services.chat.progressLogUpdate(operationId, progress, f"Processing response (iteration {iteration})") # Write raw AI response to debug file if iteration == 1: @@ -258,8 +258,8 @@ Respond with ONLY a JSON object in this exact format: self.services.utils.writeDebugFile(result, f"{debugPrefix}_response_iteration_{iteration}") # Emit stats for this iteration - self.services.workflow.storeWorkflowStat( - self.services.currentWorkflow, + self.services.chat.storeWorkflowStat( + self.services.workflow, response, f"ai.call.{debugPrefix}.iteration_{iteration}" ) @@ -286,7 +286,7 @@ Respond with ONLY a JSON object in this exact format: # Update progress after parsing if operationId: if extractedSections: - self.services.workflow.progressLogUpdate(operationId, 0.65 + (min(iteration - 1, 10) * 0.025), f"Extracted {len(extractedSections)} sections (iteration {iteration})") + self.services.chat.progressLogUpdate(operationId, 0.65 + (min(iteration - 1, 10) * 0.025), f"Extracted {len(extractedSections)} sections (iteration {iteration})") if not extractedSections: # If we're in continuation mode and JSON was incomplete, don't stop - continue to allow retry @@ -306,7 +306,7 @@ Respond with ONLY a JSON object in this exact format: else: # Done - build final result if operationId: - self.services.workflow.progressLogUpdate(operationId, 0.95, f"Generation complete ({iteration} iterations, {len(allSections)} sections)") + self.services.chat.progressLogUpdate(operationId, 0.95, f"Generation complete ({iteration} iterations, {len(allSections)} sections)") logger.info(f"Generation complete after {iteration} iterations: {len(allSections)} sections") break @@ -566,11 +566,11 @@ Respond with ONLY a JSON object in this exact format: await self._ensureAiObjectsInitialized() # Create separate operationId for detailed progress tracking - workflowId = self.services.currentWorkflow.id if self.services.currentWorkflow else f"no-workflow-{int(time.time())}" + workflowId = self.services.workflow.id if self.services.workflow else f"no-workflow-{int(time.time())}" aiOperationId = f"ai_documents_{workflowId}_{int(time.time())}" # Start progress tracking for this operation - self.services.workflow.progressLogStart( + self.services.chat.progressLogStart( aiOperationId, "AI call with documents", "Document Generation", @@ -580,7 +580,7 @@ Respond with ONLY a JSON object in this exact format: try: if options is None or (hasattr(options, 'operationType') and options.operationType is None): # Use AI to determine parameters ONLY when truly needed (options=None OR operationType=None) - self.services.workflow.progressLogUpdate(aiOperationId, 0.1, "Analyzing prompt parameters") + self.services.chat.progressLogUpdate(aiOperationId, 0.1, "Analyzing prompt parameters") options = await self._analyzePromptAndCreateOptions(prompt) # Check operationType FIRST - some operations need direct routing (before document generation checks) @@ -591,7 +591,7 @@ Respond with ONLY a JSON object in this exact format: if isImageRequest: # Image generation uses generic call path but bypasses document generation pipeline - self.services.workflow.progressLogUpdate(aiOperationId, 0.4, "Calling AI for image generation") + self.services.chat.progressLogUpdate(aiOperationId, 0.4, "Calling AI for image generation") # Call via generic path (no looping for images) request = AiCallRequest( @@ -621,19 +621,19 @@ Respond with ONLY a JSON object in this exact format: result = response.content # Emit stats for image generation - self.services.workflow.storeWorkflowStat( - self.services.currentWorkflow, + self.services.chat.storeWorkflowStat( + self.services.workflow, response, f"ai.generate.image" ) - self.services.workflow.progressLogUpdate(aiOperationId, 0.9, "Image generated") - self.services.workflow.progressLogFinish(aiOperationId, True) + self.services.chat.progressLogUpdate(aiOperationId, 0.9, "Image generated") + self.services.chat.progressLogFinish(aiOperationId, True) return result else: errorMsg = f"No image data returned: {response.content}" logger.error(f"Error in AI image generation: {errorMsg}") - self.services.workflow.progressLogFinish(aiOperationId, False) + self.services.chat.progressLogFinish(aiOperationId, False) return {"success": False, "error": errorMsg} # Handle WEB_SEARCH and WEB_CRAWL operations - route directly to connectors @@ -645,7 +645,7 @@ Respond with ONLY a JSON object in this exact format: # Web operations: prompt is already structured JSON (AiCallPromptWebSearch/WebCrawl) # Route directly through centralized AI call - model selector chooses appropriate connector # Connector parses the JSON prompt and executes the operation - self.services.workflow.progressLogUpdate(aiOperationId, 0.4, f"Calling AI for {opType.name}") + self.services.chat.progressLogUpdate(aiOperationId, 0.4, f"Calling AI for {opType.name}") request = AiCallRequest( prompt=prompt, # Pass raw JSON prompt unchanged - connector will parse it @@ -658,19 +658,19 @@ Respond with ONLY a JSON object in this exact format: # Extract result from response if response.content: # Emit stats for web operation - self.services.workflow.storeWorkflowStat( - self.services.currentWorkflow, + self.services.chat.storeWorkflowStat( + self.services.workflow, response, f"ai.{opType.name.lower()}" ) - self.services.workflow.progressLogUpdate(aiOperationId, 0.9, f"{opType.name} completed") - self.services.workflow.progressLogFinish(aiOperationId, True) + self.services.chat.progressLogUpdate(aiOperationId, 0.9, f"{opType.name} completed") + self.services.chat.progressLogFinish(aiOperationId, True) return response.content else: errorMsg = f"No content returned from {opType.name}: {response.content}" logger.error(f"Error in {opType.name}: {errorMsg}") - self.services.workflow.progressLogFinish(aiOperationId, False) + self.services.chat.progressLogFinish(aiOperationId, False) return {"success": False, "error": errorMsg} # CRITICAL: For document generation with JSON templates, NEVER compress the prompt @@ -685,13 +685,13 @@ Respond with ONLY a JSON object in this exact format: if outputFormat: # Use unified generation method for all document generation if documents and len(documents) > 0: - self.services.workflow.progressLogUpdate(aiOperationId, 0.2, f"Extracting content from {len(documents)} documents") + self.services.chat.progressLogUpdate(aiOperationId, 0.2, f"Extracting content from {len(documents)} documents") extracted_content = await self.callAiText(prompt, documents, options, aiOperationId) else: - self.services.workflow.progressLogUpdate(aiOperationId, 0.2, "Preparing for direct generation") + self.services.chat.progressLogUpdate(aiOperationId, 0.2, "Preparing for direct generation") extracted_content = None - self.services.workflow.progressLogUpdate(aiOperationId, 0.3, "Building generation prompt") + self.services.chat.progressLogUpdate(aiOperationId, 0.3, "Building generation prompt") from modules.services.serviceGeneration.subPromptBuilderGeneration import buildGenerationPrompt # First call without continuation context generation_prompt = await buildGenerationPrompt(outputFormat, prompt, title, extracted_content, None) @@ -704,7 +704,7 @@ Respond with ONLY a JSON object in this exact format: "extracted_content": extracted_content } - self.services.workflow.progressLogUpdate(aiOperationId, 0.4, "Calling AI for content generation") + self.services.chat.progressLogUpdate(aiOperationId, 0.4, "Calling AI for content generation") generated_json = await self._callAiWithLooping( generation_prompt, options, @@ -714,7 +714,7 @@ Respond with ONLY a JSON object in this exact format: aiOperationId ) - self.services.workflow.progressLogUpdate(aiOperationId, 0.7, "Parsing generated JSON") + self.services.chat.progressLogUpdate(aiOperationId, 0.7, "Parsing generated JSON") # Parse the generated JSON (extract fenced/embedded JSON first) try: extracted_json = self.services.utils.jsonExtractString(generated_json) @@ -728,10 +728,10 @@ Respond with ONLY a JSON object in this exact format: # Write the problematic JSON to debug file self.services.utils.writeDebugFile(generated_json, "failed_json_parsing") - self.services.workflow.progressLogFinish(aiOperationId, False) + self.services.chat.progressLogFinish(aiOperationId, False) return {"success": False, "error": f"Generated content is not valid JSON: {str(e)}"} - self.services.workflow.progressLogUpdate(aiOperationId, 0.8, f"Rendering to {outputFormat} format") + self.services.chat.progressLogUpdate(aiOperationId, 0.8, f"Rendering to {outputFormat} format") # Render to final format using the existing renderer try: from modules.services.serviceGeneration.mainServiceGeneration import GenerationService @@ -761,16 +761,16 @@ Respond with ONLY a JSON object in this exact format: # Log AI response for debugging self.services.utils.writeDebugFile(str(result), "document_generation_response", documents) - self.services.workflow.progressLogFinish(aiOperationId, True) + self.services.chat.progressLogFinish(aiOperationId, True) return result except Exception as e: logger.error(f"Error rendering document: {str(e)}") - self.services.workflow.progressLogFinish(aiOperationId, False) + self.services.chat.progressLogFinish(aiOperationId, False) return {"success": False, "error": f"Rendering failed: {str(e)}"} # Handle text calls (no output format specified) - self.services.workflow.progressLogUpdate(aiOperationId, 0.5, "Processing text call") + self.services.chat.progressLogUpdate(aiOperationId, 0.5, "Processing text call") if documents: # Use document processing for text calls with documents result = await self.callAiText(prompt, documents, options, aiOperationId) @@ -778,12 +778,12 @@ Respond with ONLY a JSON object in this exact format: # Use shared core function for direct text calls result = await self._callAiWithLooping(prompt, options, "text", None, None, aiOperationId) - self.services.workflow.progressLogFinish(aiOperationId, True) + self.services.chat.progressLogFinish(aiOperationId, True) return result except Exception as e: logger.error(f"Error in callAiDocuments: {str(e)}") - self.services.workflow.progressLogFinish(aiOperationId, False) + self.services.chat.progressLogFinish(aiOperationId, False) raise async def callAiText( diff --git a/modules/services/serviceWorkflow/mainServiceWorkflow.py b/modules/services/serviceChat/mainServiceChat.py similarity index 90% rename from modules/services/serviceWorkflow/mainServiceWorkflow.py rename to modules/services/serviceChat/mainServiceChat.py index 65019280..495985b5 100644 --- a/modules/services/serviceWorkflow/mainServiceWorkflow.py +++ b/modules/services/serviceChat/mainServiceChat.py @@ -8,13 +8,13 @@ from modules.shared.progressLogger import ProgressLogger logger = logging.getLogger(__name__) -class WorkflowService: +class ChatService: """Service class containing methods for document processing, chat operations, and workflow management""" def __init__(self, serviceCenter): self.services = serviceCenter self.user = serviceCenter.user - self.workflow = serviceCenter.workflow + # self.services.workflow is now the ChatWorkflow object (stable during workflow execution) self.interfaceDbChat = serviceCenter.interfaceDbChat self.interfaceDbComponent = serviceCenter.interfaceDbComponent self.interfaceDbApp = serviceCenter.interfaceDbApp @@ -23,11 +23,16 @@ class WorkflowService: def getChatDocumentsFromDocumentList(self, documentList: List[str]) -> List[ChatDocument]: """Get ChatDocuments from a list of document references using all three formats.""" try: - workflow = self.services.currentWorkflow - workflow_id = workflow.id if workflow and hasattr(workflow, 'id') else 'NO_ID' - workflow_obj_id = id(workflow) if workflow else None + # Use self.services.workflow which is the ChatWorkflow object (stable during workflow execution) + workflow = self.services.workflow + if not workflow: + logger.error("getChatDocumentsFromDocumentList: No workflow available (self.services.workflow is not set)") + return [] + + workflow_id = workflow.id if hasattr(workflow, 'id') else 'NO_ID' + workflow_obj_id = id(workflow) logger.debug(f"getChatDocumentsFromDocumentList: input documentList = {documentList}") - logger.debug(f"getChatDocumentsFromDocumentList: currentWorkflow.id = {workflow_id}, workflow object id = {workflow_obj_id}") + logger.debug(f"getChatDocumentsFromDocumentList: using workflow.id = {workflow_id}, workflow object id = {workflow_obj_id}") # Debug: list available messages with their labels and document names try: @@ -59,6 +64,11 @@ class WorkflowService: doc_id = parts[1] # Find the document by ID for message in workflow.messages: + # Validate message belongs to this workflow + msg_workflow_id = getattr(message, 'workflowId', None) + if msg_workflow_id and msg_workflow_id != workflow_id: + continue + if message.documents: for doc in message.documents: if doc.id == doc_id: @@ -75,6 +85,11 @@ class WorkflowService: # First try to find the message by ID in the current workflow message_found = None for message in workflow.messages: + # Validate message belongs to this workflow + msg_workflow_id = getattr(message, 'workflowId', None) + if msg_workflow_id and msg_workflow_id != workflow_id: + continue + if str(message.id) == message_id: message_found = message break @@ -96,6 +111,12 @@ class WorkflowService: label = parts[1] message_found = None for message in workflow.messages: + # Validate message belongs to this workflow + msg_workflow_id = getattr(message, 'workflowId', None) + if msg_workflow_id and msg_workflow_id != workflow_id: + logger.warning(f"Message {message.id} has workflowId {msg_workflow_id} but belongs to workflow {workflow_id}. Skipping.") + continue + msg_label = getattr(message, 'documentsLabel', None) if msg_label == label: message_found = message @@ -120,6 +141,12 @@ class WorkflowService: # In case of retries, we want the NEWEST message (most recent publishedAt) matching_messages = [] for message in workflow.messages: + # Validate message belongs to this workflow + msg_workflow_id = getattr(message, 'workflowId', None) + if msg_workflow_id and msg_workflow_id != workflow_id: + logger.debug(f"Skipping message {message.id} with workflowId {msg_workflow_id} (expected {workflow_id})") + continue + msg_documents_label = getattr(message, 'documentsLabel', '') # Check if this message's documentsLabel matches our reference @@ -358,10 +385,13 @@ class WorkflowService: def getWorkflowContext(self) -> Dict[str, int]: """Get current workflow context for document generation""" try: + workflow = self.services.workflow + if not workflow: + return {'currentRound': 0, 'currentTask': 0, 'currentAction': 0} return { - 'currentRound': self.workflow.currentRound if hasattr(self.workflow, 'currentRound') else 0, - 'currentTask': self.workflow.currentTask if hasattr(self.workflow, 'currentTask') else 0, - 'currentAction': self.workflow.currentAction if hasattr(self.workflow, 'currentAction') else 0 + 'currentRound': workflow.currentRound if hasattr(workflow, 'currentRound') else 0, + 'currentTask': workflow.currentTask if hasattr(workflow, 'currentTask') else 0, + 'currentAction': workflow.currentAction if hasattr(workflow, 'currentAction') else 0 } except Exception as e: logger.error(f"Error getting workflow context: {str(e)}") @@ -370,7 +400,10 @@ class WorkflowService: def setWorkflowContext(self, roundNumber: int = None, taskNumber: int = None, actionNumber: int = None): """Set current workflow context for document generation and routing""" try: - workflow = self.services.currentWorkflow + workflow = self.services.workflow + if not workflow: + logger.error("setWorkflowContext: No workflow available") + return # Prepare update data update_data = {} @@ -396,15 +429,26 @@ class WorkflowService: def getWorkflowStats(self) -> Dict[str, Any]: """Get comprehensive workflow statistics including current context""" try: + workflow = self.services.workflow workflow_context = self.getWorkflowContext() + if not workflow: + return { + 'currentRound': workflow_context['currentRound'], + 'currentTask': workflow_context['currentTask'], + 'currentAction': workflow_context['currentAction'], + 'totalTasks': 0, + 'totalActions': 0, + 'workflowStatus': 'unknown', + 'workflowId': 'unknown' + } return { 'currentRound': workflow_context['currentRound'], 'currentTask': workflow_context['currentTask'], 'currentAction': workflow_context['currentAction'], - 'totalTasks': self.workflow.totalTasks if hasattr(self.workflow, 'totalTasks') else 0, - 'totalActions': self.workflow.totalActions if hasattr(self.workflow, 'totalActions') else 0, - 'workflowStatus': self.workflow.status if hasattr(self.workflow, 'status') else 'unknown', - 'workflowId': self.workflow.id if hasattr(self.workflow, 'id') else 'unknown' + 'totalTasks': workflow.totalTasks if hasattr(workflow, 'totalTasks') else 0, + 'totalActions': workflow.totalActions if hasattr(workflow, 'totalActions') else 0, + 'workflowStatus': workflow.status if hasattr(workflow, 'status') else 'unknown', + 'workflowId': workflow.id if hasattr(workflow, 'id') else 'unknown' } except Exception as e: logger.error(f"Error getting workflow stats: {str(e)}") @@ -532,7 +576,9 @@ class WorkflowService: def getDocumentCount(self) -> str: """Get document count for task planning (matching old handlingTasks.py logic)""" try: - workflow = self.services.currentWorkflow + workflow = self.services.workflow + if not workflow: + return "No documents available" # Count documents from all messages in the workflow (like old system) total_docs = 0 @@ -551,7 +597,9 @@ class WorkflowService: def getWorkflowHistoryContext(self) -> str: """Get workflow history context for task planning (matching old handlingTasks.py logic)""" try: - workflow = self.services.currentWorkflow + workflow = self.services.workflow + if not workflow: + return "No previous round context available" # Check if there are any previous rounds by looking for "first" messages has_previous_rounds = False @@ -594,8 +642,7 @@ class WorkflowService: workflow_id = workflow.id if hasattr(workflow, 'id') else 'NO_ID' workflow_obj_id = id(workflow) - current_workflow_obj_id = id(self.services.currentWorkflow) if self.services.currentWorkflow else None - logger.debug(f"getAvailableDocuments: workflow.id = {workflow_id}, workflow object id = {workflow_obj_id}, currentWorkflow object id = {current_workflow_obj_id}") + logger.debug(f"getAvailableDocuments: workflow.id = {workflow_id}, workflow object id = {workflow_obj_id}") # Use the provided workflow object directly to avoid database reload issues # that can cause filename truncation. The workflow object should already be up-to-date. @@ -832,22 +879,15 @@ class WorkflowService: logger.error(f"Error getting connection reference list: {str(e)}") return [] - def setCurrentWorkflow(self, workflow): - """Set the current workflow reference for this service""" - self.workflow = workflow - # Reset progress logger for new workflow - self._progressLogger = None def _getProgressLogger(self): """Get or create the progress logger instance""" if self._progressLogger is None: - # Use currentWorkflow from self.services instead of self.workflow (which is self) - workflow = getattr(self.services, 'currentWorkflow', None) - self._progressLogger = ProgressLogger(self, workflow) + self._progressLogger = ProgressLogger(self.services) return self._progressLogger - def createProgressLogger(self, workflow) -> ProgressLogger: - return ProgressLogger(self, workflow) + def createProgressLogger(self) -> ProgressLogger: + return ProgressLogger(self.services) def progressLogStart(self, operationId: str, serviceName: str, actionName: str, context: str = ""): """Wrapper for ProgressLogger.startOperation""" @@ -862,4 +902,5 @@ class WorkflowService: def progressLogFinish(self, operationId: str, success: bool = True): """Wrapper for ProgressLogger.finishOperation""" progressLogger = self._getProgressLogger() - return progressLogger.finishOperation(operationId, success) \ No newline at end of file + return progressLogger.finishOperation(operationId, success) + diff --git a/modules/services/serviceExtraction/mainServiceExtraction.py b/modules/services/serviceExtraction/mainServiceExtraction.py index 8217d246..4cc7702d 100644 --- a/modules/services/serviceExtraction/mainServiceExtraction.py +++ b/modules/services/serviceExtraction/mainServiceExtraction.py @@ -135,8 +135,8 @@ class ExtractionService: errorCount=0 ) - self.services.workflow.storeWorkflowStat( - self.services.currentWorkflow, + self.services.chat.storeWorkflowStat( + self.services.workflow, aiResponse, f"extraction.process.{doc.mimeType}" ) @@ -422,9 +422,9 @@ class ExtractionService: # Create operationId if not provided if not operationId: - workflowId = self.services.currentWorkflow.id if self.services.currentWorkflow else f"no-workflow-{int(time.time())}" + workflowId = self.services.workflow.id if self.services.workflow else f"no-workflow-{int(time.time())}" operationId = f"ai_text_extract_{workflowId}_{int(time.time())}" - self.services.workflow.progressLogStart( + self.services.chat.progressLogStart( operationId, "AI Text Extract", "Document Processing", @@ -452,35 +452,35 @@ class ExtractionService: # Extract content WITHOUT chunking if operationId: - self.services.workflow.progressLogUpdate(operationId, 0.1, f"Extracting content from {len(documents)} documents") + self.services.chat.progressLogUpdate(operationId, 0.1, f"Extracting content from {len(documents)} documents") extractionResult = self.extractContent(documents, extractionOptions) if not isinstance(extractionResult, list): if operationId: - self.services.workflow.progressLogFinish(operationId, False) + self.services.chat.progressLogFinish(operationId, False) return "[Error: No extraction results]" # Process parts (not chunks) with model-aware AI calls if operationId: - self.services.workflow.progressLogUpdate(operationId, 0.3, f"Processing {len(extractionResult)} extracted content parts") + self.services.chat.progressLogUpdate(operationId, 0.3, f"Processing {len(extractionResult)} extracted content parts") partResults = await self._processPartsWithMapping(extractionResult, prompt, aiObjects, options, operationId) # Merge results using existing merging system if operationId: - self.services.workflow.progressLogUpdate(operationId, 0.9, f"Merging {len(partResults)} part results") + self.services.chat.progressLogUpdate(operationId, 0.9, f"Merging {len(partResults)} part results") mergedContent = self._mergePartResults(partResults, options) # Save merged extraction content to debug self.services.utils.writeDebugFile(mergedContent or '', "extraction_merged_text") if operationId: - self.services.workflow.progressLogFinish(operationId, True) + self.services.chat.progressLogFinish(operationId, True) return mergedContent except Exception as e: logger.error(f"Error in processDocumentsPerChunk: {str(e)}") if operationId: - self.services.workflow.progressLogFinish(operationId, False) + self.services.chat.progressLogFinish(operationId, False) raise async def _processPartsWithMapping( @@ -539,21 +539,22 @@ class ExtractionService: if operationId and totalParts > 0: processedCount[0] += 1 progress = 0.3 + (processedCount[0] / totalParts * 0.6) # Progress from 0.3 to 0.9 - self.services.workflow.progressLogUpdate(operationId, progress, f"Processing part {processedCount[0]}/{totalParts}") + self.services.chat.progressLogUpdate(operationId, progress, f"Processing part {processedCount[0]}/{totalParts}") # Create progress callback for chunking def chunkingProgressCallback(chunkProgress: float, status: str): """Callback to log chunking progress as ChatLog entries""" - if self.services.workflow and self.services.currentWorkflow: + workflow = self.services.workflow + if workflow: logData = { - "workflowId": self.services.currentWorkflow.id, + "workflowId": workflow.id, "message": "Service AI", "type": "info", "status": status, "progress": chunkProgress } try: - self.services.workflow.storeLog(self.services.currentWorkflow, logData) + self.services.chat.storeLog(workflow, logData) except Exception as e: logger.warning(f"Failed to store chunking progress log: {e}") diff --git a/modules/services/serviceGeneration/mainServiceGeneration.py b/modules/services/serviceGeneration/mainServiceGeneration.py index 9dddb49d..79730a0a 100644 --- a/modules/services/serviceGeneration/mainServiceGeneration.py +++ b/modules/services/serviceGeneration/mainServiceGeneration.py @@ -1,10 +1,7 @@ import logging import uuid -import time -from typing import Any, Dict, List, Optional, Union, Tuple +from typing import Any, Dict, List, Optional from modules.datamodels.datamodelChat import ChatDocument -from modules.datamodels.datamodelAi import AiCallResponse -from modules.aicore.aicoreModelRegistry import modelRegistry from modules.services.serviceGeneration.subDocumentUtility import ( getFileExtension, getMimeTypeFromExtension, @@ -19,11 +16,10 @@ class GenerationService: def __init__(self, serviceCenter=None): # Directly use interfaces from the provided service center (no self.service calls) self.services = serviceCenter - self.interfaceDbComponent = getattr(serviceCenter, 'interfaceDbComponent', None) if serviceCenter else None - self.interfaceDbChat = getattr(serviceCenter, 'interfaceDbChat', None) if serviceCenter else None - self.workflow = getattr(serviceCenter, 'workflow', None) if serviceCenter else None + self.interfaceDbComponent = serviceCenter.interfaceDbComponent + self.interfaceDbChat = serviceCenter.interfaceDbChat - def processActionResultDocuments(self, action_result, action, workflow) -> List[Dict[str, Any]]: + def processActionResultDocuments(self, actionResult, action) -> List[Dict[str, Any]]: """ Process documents produced by AI actions and convert them to ChatDocument format. This function handles AI-generated document data, not document references. @@ -31,7 +27,7 @@ class GenerationService: """ try: # Read documents from the standard documents field (not data.documents) - documents = action_result.documents if action_result and hasattr(action_result, 'documents') else [] + documents = actionResult.documents if actionResult and hasattr(actionResult, 'documents') else [] if not documents: return [] @@ -69,13 +65,13 @@ class GenerationService: logger.error(f"Error processing single document: {str(e)}") return None - def createDocumentsFromActionResult(self, action_result, action, workflow, message_id=None) -> List[Any]: + def createDocumentsFromActionResult(self, actionResult, action, workflow, message_id=None) -> List[Any]: """ Create actual document objects from action result and store them in the system. Returns a list of created document objects with proper workflow context. """ try: - processed_docs = self.processActionResultDocuments(action_result, action, workflow) + processed_docs = self.processActionResultDocuments(actionResult, action) createdDocuments = [] for i, doc_data in enumerate(processed_docs): diff --git a/modules/services/serviceWeb/mainServiceWeb.py b/modules/services/serviceWeb/mainServiceWeb.py index c5e05412..be7609e8 100644 --- a/modules/services/serviceWeb/mainServiceWeb.py +++ b/modules/services/serviceWeb/mainServiceWeb.py @@ -47,7 +47,7 @@ class WebService: """ try: # Step 1: AI intention analysis - extract URLs and parameters from prompt - self.services.workflow.progressLogUpdate(operationId, 0.1, "Analyzing research intent") + self.services.chat.progressLogUpdate(operationId, 0.1, "Analyzing research intent") analysisResult = await self._analyzeResearchIntent(prompt, urls, country, language, researchDepth) @@ -72,7 +72,7 @@ class WebService: # Step 2: Search for URLs if needed (based on needsSearch flag) if needsSearch and (not allUrls or len(allUrls) < maxNumberPages): - self.services.workflow.progressLogUpdate(operationId, 0.3, "Searching for URLs") + self.services.chat.progressLogUpdate(operationId, 0.3, "Searching for URLs") searchUrls = await self._performWebSearch( instruction=instruction, @@ -84,7 +84,7 @@ class WebService: # Add search URLs to the list allUrls.extend(searchUrls) - self.services.workflow.progressLogUpdate(operationId, 0.5, f"Found {len(allUrls)} total URLs") + self.services.chat.progressLogUpdate(operationId, 0.5, f"Found {len(allUrls)} total URLs") # Step 3: Filter to maxNumberPages (simple cut, no intelligent filtering) if len(allUrls) > maxNumberPages: @@ -99,7 +99,7 @@ class WebService: maxDepth = depthMap.get(finalResearchDepth.lower(), 2) # Step 5: Crawl all URLs - self.services.workflow.progressLogUpdate(operationId, 0.6, f"Crawling {len(allUrls)} URLs") + self.services.chat.progressLogUpdate(operationId, 0.6, f"Crawling {len(allUrls)} URLs") crawlResult = await self._performWebCrawl( instruction=instruction, @@ -107,7 +107,7 @@ class WebService: maxDepth=maxDepth ) - self.services.workflow.progressLogUpdate(operationId, 0.9, "Consolidating results") + self.services.chat.progressLogUpdate(operationId, 0.9, "Consolidating results") # Return consolidated result result = { diff --git a/modules/shared/progressLogger.py b/modules/shared/progressLogger.py index e6ecdee7..4d1c890f 100644 --- a/modules/shared/progressLogger.py +++ b/modules/shared/progressLogger.py @@ -13,15 +13,13 @@ logger = logging.getLogger(__name__) class ProgressLogger: """Centralized progress logger for workflow operations.""" - def __init__(self, workflowService, workflow): + def __init__(self, services): """Initialize progress logger. Args: - workflowService: WorkflowService instance for logging - workflow: Workflow object to get workflowId from + services: Services object for accessing chat service and workflow """ - self.workflowService = workflowService - self.workflow = workflow + self.services = services self.activeOperations = {} self.finishedOperations = set() # Track finished operations to avoid repeated warnings @@ -115,8 +113,13 @@ class ProgressLogger: op = self.activeOperations[operationId] message = f"Service {op['service']}" + workflow = self.services.workflow + if not workflow: + logger.warning(f"Cannot log progress: no workflow available") + return + logData = { - "workflowId": self.workflow.id, + "workflowId": workflow.id, "message": message, "type": "info", "status": status, @@ -124,7 +127,7 @@ class ProgressLogger: } try: - self.workflowService.storeLog(self.workflow, logData) + self.services.chat.storeLog(workflow, logData) except Exception as e: logger.error(f"Failed to store progress log: {e}") diff --git a/modules/workflows/methods/methodAi.py b/modules/workflows/methods/methodAi.py index b54e855b..a1206ba6 100644 --- a/modules/workflows/methods/methodAi.py +++ b/modules/workflows/methods/methodAi.py @@ -43,11 +43,11 @@ class MethodAi(MethodBase): """ try: # Init progress logger - workflowId = self.services.currentWorkflow.id if self.services.currentWorkflow else f"no-workflow-{int(time.time())}" + workflowId = self.services.workflow.id if self.services.workflow else f"no-workflow-{int(time.time())}" operationId = f"ai_process_{workflowId}_{int(time.time())}" # Start progress tracking - self.services.workflow.progressLogStart( + self.services.chat.progressLogStart( operationId, "Generate", "AI Processing", @@ -58,7 +58,7 @@ class MethodAi(MethodBase): logger.info(f"aiPrompt extracted: '{aiPrompt}' (type: {type(aiPrompt)})") # Update progress - preparing parameters - self.services.workflow.progressLogUpdate(operationId, 0.2, "Preparing parameters") + self.services.chat.progressLogUpdate(operationId, 0.2, "Preparing parameters") documentList = parameters.get("documentList", []) if isinstance(documentList, str): @@ -79,17 +79,17 @@ class MethodAi(MethodBase): logger.info(f"Using result type: {resultType} -> {output_extension}") # Update progress - preparing documents - self.services.workflow.progressLogUpdate(operationId, 0.3, "Preparing documents") + self.services.chat.progressLogUpdate(operationId, 0.3, "Preparing documents") # Get ChatDocuments for AI service - let AI service handle all document processing chatDocuments = [] if documentList: - chatDocuments = self.services.workflow.getChatDocumentsFromDocumentList(documentList) + chatDocuments = self.services.chat.getChatDocumentsFromDocumentList(documentList) if chatDocuments: logger.info(f"Prepared {len(chatDocuments)} documents for AI processing") # Update progress - preparing AI call - self.services.workflow.progressLogUpdate(operationId, 0.4, "Preparing AI call") + self.services.chat.progressLogUpdate(operationId, 0.4, "Preparing AI call") # Build options with only resultFormat - let service layer handle all other parameters output_format = output_extension.replace('.', '') or 'txt' @@ -99,7 +99,7 @@ class MethodAi(MethodBase): ) # Update progress - calling AI - self.services.workflow.progressLogUpdate(operationId, 0.6, "Calling AI") + self.services.chat.progressLogUpdate(operationId, 0.6, "Calling AI") result = await self.services.ai.callAiDocuments( prompt=aiPrompt, @@ -109,7 +109,7 @@ class MethodAi(MethodBase): ) # Update progress - processing result - self.services.workflow.progressLogUpdate(operationId, 0.8, "Processing result") + self.services.chat.progressLogUpdate(operationId, 0.8, "Processing result") from modules.datamodels.datamodelChat import ActionDocument @@ -147,7 +147,7 @@ class MethodAi(MethodBase): final_documents = [action_document] # Complete progress tracking - self.services.workflow.progressLogFinish(operationId, True) + self.services.chat.progressLogFinish(operationId, True) return ActionResult.isSuccess(documents=final_documents) @@ -156,7 +156,7 @@ class MethodAi(MethodBase): # Complete progress tracking with failure try: - self.services.workflow.progressLogFinish(operationId, False) + self.services.chat.progressLogFinish(operationId, False) except: pass # Don't fail on progress logging errors @@ -186,10 +186,10 @@ class MethodAi(MethodBase): return ActionResult.isFailure(error="Research prompt is required") # Init progress logger - operationId = f"web_research_{self.services.currentWorkflow.id}_{int(time.time())}" + operationId = f"web_research_{self.services.workflow.id}_{int(time.time())}" # Start progress tracking - self.services.workflow.progressLogStart( + self.services.chat.progressLogStart( operationId, "Web Research", "Searching and Crawling", @@ -207,7 +207,7 @@ class MethodAi(MethodBase): ) # Complete progress tracking - self.services.workflow.progressLogFinish(operationId, True) + self.services.chat.progressLogFinish(operationId, True) # Get meaningful filename from research result (generated by intent analyzer) suggestedFilename = result.get("suggested_filename") @@ -249,7 +249,7 @@ class MethodAi(MethodBase): except Exception as e: logger.error(f"Error in web research: {str(e)}") try: - self.services.workflow.progressLogFinish(operationId, False) + self.services.chat.progressLogFinish(operationId, False) except: pass return ActionResult.isFailure(error=str(e)) diff --git a/modules/workflows/methods/methodBase.py b/modules/workflows/methods/methodBase.py index 2850f370..3d6742aa 100644 --- a/modules/workflows/methods/methodBase.py +++ b/modules/workflows/methods/methodBase.py @@ -187,7 +187,7 @@ class MethodBase: try: # Get workflow context from services if not provided if workflow_context is None and hasattr(self.services, 'workflow'): - workflow_context = self.services.workflow.getWorkflowContext() + workflow_context = self.services.chat.getWorkflowContext() # Extract round, task, action numbers round_num = workflow_context.get('currentRound', 0) if workflow_context else 0 diff --git a/modules/workflows/methods/methodOutlook.py b/modules/workflows/methods/methodOutlook.py index c7e2eb35..8e769b99 100644 --- a/modules/workflows/methods/methodOutlook.py +++ b/modules/workflows/methods/methodOutlook.py @@ -36,7 +36,7 @@ class MethodOutlook(MethodBase): logger.debug(f"Getting Microsoft connection for reference: {connectionReference}") # Get the connection from the service - userConnection = self.services.workflow.getUserConnectionFromConnectionReference(connectionReference) + userConnection = self.services.chat.getUserConnectionFromConnectionReference(connectionReference) if not userConnection: logger.error(f"Connection not found: {connectionReference}") return None @@ -44,7 +44,7 @@ class MethodOutlook(MethodBase): logger.debug(f"Found connection: {userConnection.id}, status: {userConnection.status.value}, authority: {userConnection.authority.value}") # Get a fresh token for this connection - token = self.services.workflow.getFreshConnectionToken(userConnection.id) + token = self.services.chat.getFreshConnectionToken(userConnection.id) if not token: logger.error(f"Fresh token not found for connection: {userConnection.id}") logger.debug(f"Connection details: {userConnection}") @@ -1136,7 +1136,7 @@ class MethodOutlook(MethodBase): # Prepare documents for AI processing chatDocuments = [] if documentList: - chatDocuments = self.services.workflow.getChatDocumentsFromDocumentList(documentList) + chatDocuments = self.services.chat.getChatDocumentsFromDocumentList(documentList) # Create AI prompt for email composition # Build document reference list for AI with expanded list contents when possible @@ -1144,13 +1144,12 @@ class MethodOutlook(MethodBase): doc_list_text = "" if doc_references: lines = ["Available_Document_References:"] - workflow_obj = getattr(self.services, 'currentWorkflow', None) for ref in doc_references: # Each item is a label: resolve to its document list and render contained items - list_docs = self.services.workflow.getChatDocumentsFromDocumentList([ref]) or [] + list_docs = self.services.chat.getChatDocumentsFromDocumentList([ref]) or [] if list_docs: for d in list_docs: - doc_ref_label = self.services.workflow.getDocumentReferenceFromChatDocument(d) + doc_ref_label = self.services.chat.getDocumentReferenceFromChatDocument(d) lines.append(f"- {doc_ref_label}") else: lines.append(" - (no documents)") @@ -1216,7 +1215,7 @@ Return JSON: if documentList: try: available_refs = [documentList] if isinstance(documentList, str) else documentList - available_docs = self.services.workflow.getChatDocumentsFromDocumentList(available_refs) or [] + available_docs = self.services.chat.getChatDocumentsFromDocumentList(available_refs) or [] except Exception: available_docs = [] @@ -1229,7 +1228,7 @@ Return JSON: if ai_attachments: try: ai_refs = [ai_attachments] if isinstance(ai_attachments, str) else ai_attachments - ai_docs = self.services.workflow.getChatDocumentsFromDocumentList(ai_refs) or [] + ai_docs = self.services.chat.getChatDocumentsFromDocumentList(ai_refs) or [] except Exception: ai_docs = [] @@ -1239,15 +1238,15 @@ Return JSON: if selected_docs: # Map selected ChatDocuments back to docItem references - documentList = [self.services.workflow.getDocumentReferenceFromChatDocument(d) for d in selected_docs] + documentList = [self.services.chat.getDocumentReferenceFromChatDocument(d) for d in selected_docs] logger.info(f"AI selected {len(documentList)} documents for attachment (resolved via ChatDocuments)") else: # No intersection; use all available documents - documentList = [self.services.workflow.getDocumentReferenceFromChatDocument(d) for d in available_docs] + documentList = [self.services.chat.getDocumentReferenceFromChatDocument(d) for d in available_docs] logger.warning("AI selected attachments not found in available documents, using all documents") else: # No AI selection; use all available documents - documentList = [self.services.workflow.getDocumentReferenceFromChatDocument(d) for d in available_docs] + documentList = [self.services.chat.getDocumentReferenceFromChatDocument(d) for d in available_docs] logger.warning("AI did not specify attachments, using all available documents") else: logger.info("No documents provided in documentList; skipping attachment processing") @@ -1297,13 +1296,13 @@ Return JSON: message["attachments"] = [] for attachment_ref in documentList: # Get attachment document from service center - attachment_docs = self.services.workflow.getChatDocumentsFromDocumentList([attachment_ref]) + attachment_docs = self.services.chat.getChatDocumentsFromDocumentList([attachment_ref]) if attachment_docs: for doc in attachment_docs: file_id = getattr(doc, 'fileId', None) if file_id: try: - file_content = self.services.workflow.getFileData(file_id) + file_content = self.services.chat.getFileData(file_id) if file_content: if isinstance(file_content, bytes): content_bytes = file_content diff --git a/modules/workflows/methods/methodSharepoint.py b/modules/workflows/methods/methodSharepoint.py index 11bca67a..dea73974 100644 --- a/modules/workflows/methods/methodSharepoint.py +++ b/modules/workflows/methods/methodSharepoint.py @@ -31,7 +31,7 @@ class MethodSharepoint(MethodBase): def _getMicrosoftConnection(self, connectionReference: str) -> Optional[Dict[str, Any]]: """Get Microsoft connection from connection reference and configure SharePoint service""" try: - userConnection = self.services.workflow.getUserConnectionFromConnectionReference(connectionReference) + userConnection = self.services.chat.getUserConnectionFromConnectionReference(connectionReference) if not userConnection: logger.warning(f"No user connection found for reference: {connectionReference}") return None @@ -847,13 +847,13 @@ class MethodSharepoint(MethodBase): try: import json # Resolve the reference label to get the actual document list - document_list = self.services.workflow.getChatDocumentsFromDocumentList([pathObject]) + document_list = self.services.chat.getChatDocumentsFromDocumentList([pathObject]) if not document_list or len(document_list) == 0: return ActionResult.isFailure(error=f"No document list found for reference: {pathObject}") # Get the first document's content (which should be the JSON) first_document = document_list[0] - file_data = self.services.workflow.getFileData(first_document.fileId) + file_data = self.services.chat.getFileData(first_document.fileId) if not file_data: return ActionResult.isFailure(error=f"No file data found for document: {pathObject}") @@ -881,7 +881,7 @@ class MethodSharepoint(MethodBase): # Get documents from reference - ensure documentList is a list, not a string # documentList is already normalized above - chatDocuments = self.services.workflow.getChatDocumentsFromDocumentList(documentList) + chatDocuments = self.services.chat.getChatDocumentsFromDocumentList(documentList) if not chatDocuments: return ActionResult.isFailure(error="No documents found for the provided reference") @@ -1127,13 +1127,13 @@ class MethodSharepoint(MethodBase): try: import json # Resolve the reference label to get the actual document list - document_list = self.services.workflow.getChatDocumentsFromDocumentList([pathObject]) + document_list = self.services.chat.getChatDocumentsFromDocumentList([pathObject]) if not document_list or len(document_list) == 0: return ActionResult.isFailure(error=f"No document list found for reference: {pathObject}") # Get the first document's content (which should be the JSON) first_document = document_list[0] - file_data = self.services.workflow.getFileData(first_document.fileId) + file_data = self.services.chat.getFileData(first_document.fileId) if not file_data: return ActionResult.isFailure(error=f"No file data found for document: {pathObject}") @@ -1228,7 +1228,7 @@ class MethodSharepoint(MethodBase): # Get documents from reference - ensure documentList is a list, not a string if isinstance(documentList, str): documentList = [documentList] # Convert string to list - chatDocuments = self.services.workflow.getChatDocumentsFromDocumentList(documentList) + chatDocuments = self.services.chat.getChatDocumentsFromDocumentList(documentList) if not chatDocuments: return ActionResult.isFailure(error="No documents found for the provided reference") @@ -1318,7 +1318,7 @@ class MethodSharepoint(MethodBase): for i, (chatDocument, fileName) in enumerate(zip(chatDocuments, fileNames)): try: fileId = chatDocument.fileId - file_data = self.services.workflow.getFileData(fileId) + file_data = self.services.chat.getFileData(fileId) if not file_data: logger.warning(f"File data not found for fileId: {fileId}") @@ -1481,14 +1481,14 @@ class MethodSharepoint(MethodBase): try: import json # Resolve the reference label to get the actual document list - document_list = self.services.workflow.getChatDocumentsFromDocumentList([pathObject]) + document_list = self.services.chat.getChatDocumentsFromDocumentList([pathObject]) if not document_list or len(document_list) == 0: return ActionResult.isFailure(error=f"No document list found for reference: {pathObject}") # Get the first document's content (which should be the JSON) first_document = document_list[0] logger.info(f"Document fileId: {first_document.fileId}, fileName: {first_document.fileName}") - file_data = self.services.workflow.getFileData(first_document.fileId) + file_data = self.services.chat.getFileData(first_document.fileId) if not file_data: return ActionResult.isFailure(error=f"No file data found for document: {pathObject} (fileId: {first_document.fileId})") logger.info(f"File data length: {len(file_data) if file_data else 0}") diff --git a/modules/workflows/processing/core/actionExecutor.py b/modules/workflows/processing/core/actionExecutor.py index 3dbe0d35..d1b523ea 100644 --- a/modules/workflows/processing/core/actionExecutor.py +++ b/modules/workflows/processing/core/actionExecutor.py @@ -6,6 +6,7 @@ from typing import Dict, Any, List from modules.datamodels.datamodelChat import ActionResult, ActionItem, TaskStep from modules.datamodels.datamodelChat import ChatWorkflow from modules.workflows.processing.shared.methodDiscovery import methods +from modules.workflows.processing.shared.stateTools import checkWorkflowStopped logger = logging.getLogger(__name__) @@ -15,20 +16,6 @@ class ActionExecutor: def __init__(self, services): self.services = services - def _checkWorkflowStopped(self, workflow): - """Check if workflow has been stopped by user and raise exception if so""" - try: - # Get the current workflow status from the database to avoid stale data - current_workflow = self.services.interfaceDbChat.getWorkflow(workflow.id) - if current_workflow and current_workflow.status == "stopped": - logger.info("Workflow stopped by user, aborting action execution") - raise Exception("Workflow was stopped by user") - except Exception as e: - # If we can't get the current status due to other database issues, fall back to the in-memory object - logger.warning(f"Could not check current workflow status from database: {str(e)}") - if workflow and workflow.status == "stopped": - logger.info("Workflow stopped by user (from in-memory object), aborting action execution") - raise Exception("Workflow was stopped by user") async def executeAction(self, methodName: str, actionName: str, parameters: Dict[str, Any]) -> ActionResult: """Execute a method action""" @@ -70,7 +57,7 @@ class ActionExecutor: """Execute a single action and return ActionResult with enhanced document processing""" try: # Check workflow status before executing action - self._checkWorkflowStopped(workflow) + checkWorkflowStopped(self.services) # Use passed indices or fallback to '?' taskNum = taskIndex if taskIndex is not None else '?' @@ -94,7 +81,7 @@ class ActionExecutor: logger.info(f"Expected formats: {action.expectedDocumentFormats}") # Check workflow status before executing the action - self._checkWorkflowStopped(workflow) + checkWorkflowStopped(self.services) result = await self.executeAction( methodName=action.execMethod, @@ -156,7 +143,7 @@ class ActionExecutor: logger.error(f"Action failed: {result.error}") # Create database log entry for action failure (write-through + bind) - self.services.workflow.storeLog(workflow, { + self.services.chat.storeLog(workflow, { "message": f"āŒ **Task {taskNum}**āŒ **Action {actionNum}/{totalActions}** failed: {result.error}", "type": "error", "progress": 100 diff --git a/modules/workflows/processing/core/messageCreator.py b/modules/workflows/processing/core/messageCreator.py index e9489a79..ea0699ed 100644 --- a/modules/workflows/processing/core/messageCreator.py +++ b/modules/workflows/processing/core/messageCreator.py @@ -5,6 +5,7 @@ import logging from typing import Dict, Any, Optional, List from modules.datamodels.datamodelChat import TaskPlan, TaskStep, ActionResult, ReviewResult from modules.datamodels.datamodelChat import ChatWorkflow +from modules.workflows.processing.shared.stateTools import checkWorkflowStopped logger = logging.getLogger(__name__) @@ -14,26 +15,12 @@ class MessageCreator: def __init__(self, services): self.services = services - def _checkWorkflowStopped(self, workflow): - """Check if workflow has been stopped by user and raise exception if so""" - try: - # Get the current workflow status from the database to avoid stale data - current_workflow = self.services.interfaceDbChat.getWorkflow(workflow.id) - if current_workflow and current_workflow.status == "stopped": - logger.info("Workflow stopped by user, aborting message creation") - raise Exception("Workflow was stopped by user") - except Exception as e: - # If we can't get the current status due to other database issues, fall back to the in-memory object - logger.warning(f"Could not check current workflow status from database: {str(e)}") - if workflow and workflow.status == "stopped": - logger.info("Workflow stopped by user (from in-memory object), aborting message creation") - raise Exception("Workflow was stopped by user") async def createTaskPlanMessage(self, taskPlan: TaskPlan, workflow: ChatWorkflow): """Create a chat message containing the task plan with user-friendly messages""" try: # Check workflow status before creating message - self._checkWorkflowStopped(workflow) + checkWorkflowStopped(self.services) # Build task plan summary taskSummary = f"šŸ“‹ **Task Plan**\n\n" @@ -67,7 +54,7 @@ class MessageCreator: "taskProgress": "pending" } - self.services.workflow.storeMessageWithDocuments(workflow, messageData, []) + self.services.chat.storeMessageWithDocuments(workflow, messageData, []) logger.info("Task plan message created successfully") except Exception as e: logger.error(f"Error creating task plan message: {str(e)}") @@ -76,7 +63,7 @@ class MessageCreator: """Create a task start message for the user""" try: # Check workflow status before creating message - self._checkWorkflowStopped(workflow) + checkWorkflowStopped(self.services) # Create a task start message for the user taskProgress = f"{taskIndex}/{totalTasks}" if totalTasks is not None else str(taskIndex) @@ -101,7 +88,7 @@ class MessageCreator: if taskStep.userMessage: taskStartMessage["message"] += f"\n\nšŸ’¬ {taskStep.userMessage}" - self.services.workflow.storeMessageWithDocuments(workflow, taskStartMessage, []) + self.services.chat.storeMessageWithDocuments(workflow, taskStartMessage, []) logger.info(f"Task start message created for task {taskIndex}") except Exception as e: logger.error(f"Error creating task start message: {str(e)}") @@ -112,7 +99,7 @@ class MessageCreator: """Create and store a message for the action result in the workflow with enhanced document processing""" try: # Check workflow status before creating action message - self._checkWorkflowStopped(workflow) + checkWorkflowStopped(self.services) if resultLabel is None: resultLabel = action.execResultLabel @@ -124,8 +111,8 @@ class MessageCreator: logger.info(f"Result label: {resultLabel} - No documents") # Get current workflow context and stats - workflowContext = self.services.workflow.getWorkflowContext() - workflowStats = self.services.workflow.getWorkflowStats() + workflowContext = self.services.chat.getWorkflowContext() + workflowStats = self.services.chat.getWorkflowStats() # Create a more meaningful message that includes task context taskObjective = taskStep.objective if taskStep else 'Unknown task' @@ -191,7 +178,7 @@ class MessageCreator: logger.info(f"Creating ERROR message: {messageText}") logger.info(f"Message data: {messageData}") - self.services.workflow.storeMessageWithDocuments(workflow, messageData, createdDocuments) + self.services.chat.storeMessageWithDocuments(workflow, messageData, createdDocuments) logger.info(f"Message created: {action.execMethod}.{action.execAction}") except Exception as e: logger.error(f"Error creating action message: {str(e)}") @@ -201,7 +188,7 @@ class MessageCreator: """Create a task completion message for the user""" try: # Check workflow status before creating message - self._checkWorkflowStopped(workflow) + checkWorkflowStopped(self.services) # Create a task completion message for the user taskProgress = str(taskIndex) @@ -234,7 +221,7 @@ class MessageCreator: "taskProgress": "success" } - self.services.workflow.storeMessageWithDocuments(workflow, taskCompletionMessage, []) + self.services.chat.storeMessageWithDocuments(workflow, taskCompletionMessage, []) logger.info(f"Task completion message created for task {taskIndex}") except Exception as e: logger.error(f"Error creating task completion message: {str(e)}") @@ -243,7 +230,7 @@ class MessageCreator: """Create a retry message for the user""" try: # Check workflow status before creating message - self._checkWorkflowStopped(workflow) + checkWorkflowStopped(self.services) # Create retry message for user retryMessage = { @@ -261,7 +248,7 @@ class MessageCreator: "taskProgress": "retry" } - self.services.workflow.storeMessageWithDocuments(workflow, retryMessage, []) + self.services.chat.storeMessageWithDocuments(workflow, retryMessage, []) logger.info(f"Retry message created for task {taskIndex}") except Exception as e: logger.error(f"Error creating retry message: {str(e)}") @@ -270,7 +257,7 @@ class MessageCreator: """Create an error message for the user""" try: # Check workflow status before creating message - self._checkWorkflowStopped(workflow) + checkWorkflowStopped(self.services) # Create user-facing error message for task failure errorMessage = f"**Task {taskIndex}**\n\nāŒ '{taskStep.objective}' failed\n\n" @@ -300,7 +287,7 @@ class MessageCreator: "taskProgress": "fail" } - self.services.workflow.storeMessageWithDocuments(workflow, messageData, []) + self.services.chat.storeMessageWithDocuments(workflow, messageData, []) logger.info(f"Error message created for task {taskIndex}") except Exception as e: logger.error(f"Error creating error message: {str(e)}") diff --git a/modules/workflows/processing/core/taskPlanner.py b/modules/workflows/processing/core/taskPlanner.py index 6e06aa56..6a73d971 100644 --- a/modules/workflows/processing/core/taskPlanner.py +++ b/modules/workflows/processing/core/taskPlanner.py @@ -10,6 +10,7 @@ from modules.workflows.processing.shared.promptGenerationTaskplan import ( generateTaskPlanningPrompt ) from modules.workflows.processing.adaptive import IntentAnalyzer +from modules.workflows.processing.shared.stateTools import checkWorkflowStopped logger = logging.getLogger(__name__) @@ -19,26 +20,12 @@ class TaskPlanner: def __init__(self, services): self.services = services - def _checkWorkflowStopped(self, workflow): - """Check if workflow has been stopped by user and raise exception if so""" - try: - # Get the current workflow status from the database to avoid stale data - current_workflow = self.services.interfaceDbChat.getWorkflow(workflow.id) - if current_workflow and current_workflow.status == "stopped": - logger.info("Workflow stopped by user, aborting task planning") - raise Exception("Workflow was stopped by user") - except Exception as e: - # If we can't get the current status due to other database issues, fall back to the in-memory object - logger.warning(f"Could not check current workflow status from database: {str(e)}") - if workflow and workflow.status == "stopped": - logger.info("Workflow stopped by user (from in-memory object), aborting task planning") - raise Exception("Workflow was stopped by user") async def generateTaskPlan(self, userInput: str, workflow) -> TaskPlan: """Generate a high-level task plan for the workflow""" try: # Check workflow status before generating task plan - self._checkWorkflowStopped(workflow) + checkWorkflowStopped(self.services) logger.info(f"=== STARTING TASK PLAN GENERATION ===") logger.info(f"Workflow ID: {workflow.id}") @@ -49,7 +36,7 @@ class TaskPlanner: logger.info(f"Actual User Prompt: {actualUserPrompt}") # Check workflow status before calling AI service - self._checkWorkflowStopped(workflow) + checkWorkflowStopped(self.services) # Analyze user intent to obtain cleaned user objective for planning # This intent will be reused for workflow-level validation in executeTask diff --git a/modules/workflows/processing/modes/modeActionplan.py b/modules/workflows/processing/modes/modeActionplan.py index 0a50926e..facad71b 100644 --- a/modules/workflows/processing/modes/modeActionplan.py +++ b/modules/workflows/processing/modes/modeActionplan.py @@ -13,6 +13,7 @@ from modules.datamodels.datamodelChat import ( from modules.datamodels.datamodelChat import ChatWorkflow from modules.datamodels.datamodelAi import AiCallOptions, OperationTypeEnum, ProcessingModeEnum, PriorityEnum from modules.workflows.processing.modes.modeBase import BaseMode +from modules.workflows.processing.shared.stateTools import checkWorkflowStopped from modules.workflows.processing.shared.executionState import TaskExecutionState from modules.workflows.processing.shared.promptGenerationActionsActionplan import ( generateActionDefinitionPrompt, @@ -26,8 +27,8 @@ logger = logging.getLogger(__name__) class ActionplanMode(BaseMode): """Actionplan mode implementation - batch planning and sequential execution""" - def __init__(self, services, workflow): - super().__init__(services, workflow) + def __init__(self, services): + super().__init__(services) # Initialize adaptive components for enhanced validation and learning self.intentAnalyzer = IntentAnalyzer(services) self.learningEngine = LearningEngine() @@ -42,7 +43,7 @@ class ActionplanMode(BaseMode): """Generate actions for a given task step using batch planning approach""" try: # Check workflow status before generating actions - self._checkWorkflowStopped(workflow) + checkWorkflowStopped(self.services) retryInfo = f" (Retry #{enhancedContext.retryCount})" if enhancedContext and enhancedContext.retryCount > 0 else "" logger.info(f"Generating actions for task: {taskStep.objective}{retryInfo}") @@ -126,7 +127,7 @@ class ActionplanMode(BaseMode): ) # Check workflow status before calling AI service - self._checkWorkflowStopped(workflow) + checkWorkflowStopped(self.services) # Build prompt bundle (template + placeholders) bundle = generateActionDefinitionPrompt(self.services, actionContext) @@ -262,7 +263,7 @@ class ActionplanMode(BaseMode): # Update workflow context for this task if taskIndex is not None: - self.services.workflow.setWorkflowContext(taskNumber=taskIndex) + self.services.chat.setWorkflowContext(taskNumber=taskIndex) # Create task start message await self.messageCreator.createTaskStartMessage(taskStep, workflow, taskIndex, totalTasks) @@ -275,7 +276,7 @@ class ActionplanMode(BaseMode): logger.info(f"Task execution attempt {attempt+1}/{maxRetries}") # Check workflow status before starting task execution - self._checkWorkflowStopped(workflow) + checkWorkflowStopped(self.services) # Update retry context with current attempt information if retryContext: @@ -300,7 +301,7 @@ class ActionplanMode(BaseMode): actionResults = [] for actionIdx, action in enumerate(actions): # Check workflow status before each action execution - self._checkWorkflowStopped(workflow) + checkWorkflowStopped(self.services) # Update workflow object before executing action actionNumber = actionIdx + 1 @@ -330,7 +331,7 @@ class ActionplanMode(BaseMode): if action.userMessage: actionStartMessage["message"] += f"\n\nšŸ’¬ {action.userMessage}" - self.services.workflow.storeMessageWithDocuments(workflow, actionStartMessage, []) + self.services.chat.storeMessageWithDocuments(workflow, actionStartMessage, []) logger.info(f"Action start message created for action {actionNumber}") # Execute single action @@ -376,7 +377,7 @@ class ActionplanMode(BaseMode): state.addFailedAction(result) # Check workflow status before review - self._checkWorkflowStopped(workflow) + checkWorkflowStopped(self.services) reviewResult = await self._reviewTaskCompletion(taskStep, actions, actionResults, workflow) success = reviewResult.status == 'success' @@ -479,7 +480,7 @@ class ActionplanMode(BaseMode): """Review task completion and determine success/failure/retry""" try: # Check workflow status before reviewing task completion - self._checkWorkflowStopped(workflow) + checkWorkflowStopped(self.services) logger.info(f"=== STARTING TASK COMPLETION REVIEW ===") logger.info(f"Task: {taskStep.objective}") @@ -510,7 +511,7 @@ class ActionplanMode(BaseMode): ) # Check workflow status before calling AI service - self._checkWorkflowStopped(workflow) + checkWorkflowStopped(self.services) # Build prompt bundle for result review bundle = generateResultReviewPrompt(reviewContext) diff --git a/modules/workflows/processing/modes/modeAutomation.py b/modules/workflows/processing/modes/modeAutomation.py index 7e1c9ca2..7c93b43b 100644 --- a/modules/workflows/processing/modes/modeAutomation.py +++ b/modules/workflows/processing/modes/modeAutomation.py @@ -11,14 +11,15 @@ from modules.datamodels.datamodelChat import ( ) from modules.datamodels.datamodelChat import ChatWorkflow from modules.workflows.processing.modes.modeBase import BaseMode +from modules.workflows.processing.shared.stateTools import checkWorkflowStopped logger = logging.getLogger(__name__) class AutomationMode(BaseMode): """Automation mode implementation - executes workflows from predefined plans""" - def __init__(self, services, workflow): - super().__init__(services, workflow) + def __init__(self, services): + super().__init__(services) # Store action lists for each task (mapped by task ID) self.taskActionMap: Dict[str, List[Dict[str, Any]]] = {} logger.info("AutomationMode initialized - will use predefined plan from workflow") @@ -192,12 +193,12 @@ class AutomationMode(BaseMode): try: # Check workflow status - self._checkWorkflowStopped(workflow) + checkWorkflowStopped(self.services) # Update workflow before executing task if taskIndex is not None: self._updateWorkflowBeforeExecutingTask(taskIndex) - self.services.workflow.setWorkflowContext(taskNumber=taskIndex) + self.services.chat.setWorkflowContext(taskNumber=taskIndex) # Create task start message await self.messageCreator.createTaskStartMessage(taskStep, workflow, taskIndex, totalTasks) @@ -228,7 +229,7 @@ class AutomationMode(BaseMode): actionResults = [] for actionIdx, action in enumerate(actions): # Check workflow status before each action - self._checkWorkflowStopped(workflow) + checkWorkflowStopped(self.services) # Update workflow before executing action actionNumber = actionIdx + 1 @@ -256,7 +257,7 @@ class AutomationMode(BaseMode): if action.userMessage: actionStartMessage["message"] += f"\n\nšŸ’¬ {action.userMessage}" - self.services.workflow.storeMessageWithDocuments(workflow, actionStartMessage, []) + self.services.chat.storeMessageWithDocuments(workflow, actionStartMessage, []) # Execute action result = await self.actionExecutor.executeSingleAction( diff --git a/modules/workflows/processing/modes/modeBase.py b/modules/workflows/processing/modes/modeBase.py index 297052ed..b1e3d062 100644 --- a/modules/workflows/processing/modes/modeBase.py +++ b/modules/workflows/processing/modes/modeBase.py @@ -16,31 +16,13 @@ logger = logging.getLogger(__name__) class BaseMode(ABC): """Abstract base class for workflow execution modes""" - def __init__(self, services, workflow): + def __init__(self, services): self.services = services - self.workflow = workflow self.taskPlanner = TaskPlanner(services) self.actionExecutor = ActionExecutor(services) self.messageCreator = MessageCreator(services) self.validator = WorkflowValidator(services) - def _checkWorkflowStopped(self, workflow): - """Check if workflow has been stopped by user and raise exception if so""" - try: - # Get the current workflow status from the database to avoid stale data - current_workflow = self.services.interfaceDbChat.getWorkflow(workflow.id) - if current_workflow and current_workflow.status == "stopped": - logger.info("Workflow stopped by user, aborting execution") - raise Exception("Workflow was stopped by user") - except Exception as e: - # If this was the explicit stop signal, re-raise to abort immediately - if str(e) == "Workflow was stopped by user": - raise - # If we can't get the current status due to other database issues, fall back to the in-memory object - logger.warning(f"Could not check current workflow status from database: {str(e)}") - if workflow and workflow.status == "stopped": - logger.info("Workflow stopped by user (from in-memory object), aborting execution") - raise Exception("Workflow was stopped by user") @abstractmethod async def executeTask(self, taskStep: TaskStep, workflow: ChatWorkflow, context: TaskContext, diff --git a/modules/workflows/processing/modes/modeDynamic.py b/modules/workflows/processing/modes/modeDynamic.py index 94e34bec..b7e584da 100644 --- a/modules/workflows/processing/modes/modeDynamic.py +++ b/modules/workflows/processing/modes/modeDynamic.py @@ -13,6 +13,7 @@ from modules.datamodels.datamodelChat import ( ) from modules.datamodels.datamodelChat import ChatWorkflow from modules.workflows.processing.modes.modeBase import BaseMode +from modules.workflows.processing.shared.stateTools import checkWorkflowStopped from modules.workflows.processing.shared.executionState import TaskExecutionState, shouldContinue from modules.workflows.processing.shared.promptGenerationActionsDynamic import ( generateDynamicPlanSelectionPrompt, @@ -28,8 +29,8 @@ logger = logging.getLogger(__name__) class DynamicMode(BaseMode): """Dynamic mode implementation - iterative plan-act-observe-refine loop""" - def __init__(self, services, workflow): - super().__init__(services, workflow) + def __init__(self, services): + super().__init__(services) # Initialize adaptive components self.intentAnalyzer = IntentAnalyzer(services) self.learningEngine = LearningEngine() @@ -87,7 +88,7 @@ class DynamicMode(BaseMode): decision = None while step <= state.max_steps: - self._checkWorkflowStopped(workflow) + checkWorkflowStopped(self.services) # Update workflow[currentAction] for UI self._updateWorkflowBeforeExecutingAction(step) @@ -250,7 +251,7 @@ class DynamicMode(BaseMode): # Get available documents from the current workflow try: - available_docs = self.services.workflow.getAvailableDocuments(self.services.currentWorkflow) + available_docs = self.services.chat.getAvailableDocuments(self.services.workflow) if not available_docs or available_docs == "No documents available": logger.warning("No documents available for validation") return @@ -759,7 +760,7 @@ class DynamicMode(BaseMode): "actionProgress": actionProgress } - self.services.workflow.storeMessageWithDocuments(workflow, messageData, []) + self.services.chat.storeMessageWithDocuments(workflow, messageData, []) except Exception as e: logger.error(f"Error creating Dynamic action message: {str(e)}") diff --git a/modules/workflows/processing/shared/placeholderFactory.py b/modules/workflows/processing/shared/placeholderFactory.py index 7b838fc7..ed01ad4f 100644 --- a/modules/workflows/processing/shared/placeholderFactory.py +++ b/modules/workflows/processing/shared/placeholderFactory.py @@ -117,12 +117,12 @@ def extractUserPrompt(context: Any) -> str: return context.taskStep.objective or 'No request specified' return 'No request specified' -def extractWorkflowHistory(service: Any, context: Any) -> str: - """Extract workflow history from context. Maps to {{KEY:WORKFLOW_HISTORY}} +def extractWorkflowHistory(service: Any) -> str: + """Extract workflow history. Maps to {{KEY:WORKFLOW_HISTORY}} Reverse-chronological, enriched with message summaries and document labels. """ try: - history = getPreviousRoundContext(service, service.currentWorkflow) + history = getPreviousRoundContext(service) return history or "No previous workflow rounds available" except Exception as e: logger.error(f"Error getting workflow history: {str(e)}") @@ -229,13 +229,14 @@ def getMessageSummary(msg) -> str: except Exception: return "" -def getPreviousRoundContext(services, workflow: Any) -> str: +def getPreviousRoundContext(services) -> str: """Get enriched context: - Reverse-chronological ordering - Current round first (newest → oldest), then older rounds - Only messages with documents summarized - Include available documents snapshot at end """ + workflow = services.workflow try: if not workflow: return "No previous round context available" @@ -268,7 +269,7 @@ def getPreviousRoundContext(services, workflow: Any) -> str: # Include available documents snapshot at end try: if hasattr(services, 'workflow'): - docs_index = services.workflow.getAvailableDocuments(workflow) + docs_index = services.chat.getAvailableDocuments(workflow) if docs_index and docs_index != "No documents available": doc_count = docs_index.count("docItem:") # Only count actual documents, not document list labels lines.append(f"Available documents: {doc_count}") @@ -447,7 +448,7 @@ def extractLatestRefinementFeedback(context: Any) -> str: def extractAvailableDocumentsSummary(service: Any, context: Any) -> str: """Summary of available documents (count only).""" try: - documents = service.workflow.getAvailableDocuments(service.currentWorkflow) + documents = service.chat.getAvailableDocuments(service.workflow) if documents and documents != "No documents available": # Count only actual documents, not list labels doc_count = documents.count("docItem:") @@ -460,7 +461,7 @@ def extractAvailableDocumentsSummary(service: Any, context: Any) -> str: def extractAvailableDocumentsIndex(service: Any, context: Any) -> str: """Index of available documents with detailed references for parameter generation.""" try: - return service.workflow.getAvailableDocuments(service.currentWorkflow) + return service.chat.getAvailableDocuments(service.workflow) except Exception as e: logger.error(f"Error getting document index: {str(e)}") return "No documents available" diff --git a/modules/workflows/processing/shared/promptGenerationActionsActionplan.py b/modules/workflows/processing/shared/promptGenerationActionsActionplan.py index ac5732ef..002169e0 100644 --- a/modules/workflows/processing/shared/promptGenerationActionsActionplan.py +++ b/modules/workflows/processing/shared/promptGenerationActionsActionplan.py @@ -24,7 +24,7 @@ def generateActionDefinitionPrompt(services, context: Any) -> PromptBundle: PromptPlaceholder(label="USER_PROMPT", content=extractUserPrompt(context), summaryAllowed=False), PromptPlaceholder(label="AVAILABLE_DOCUMENTS_SUMMARY", content=extractAvailableDocumentsSummary(services, context), summaryAllowed=True), PromptPlaceholder(label="AVAILABLE_CONNECTIONS_INDEX", content=extractAvailableConnectionsIndex(services), summaryAllowed=False), - PromptPlaceholder(label="WORKFLOW_HISTORY", content=extractWorkflowHistory(services, context), summaryAllowed=True), + PromptPlaceholder(label="WORKFLOW_HISTORY", content=extractWorkflowHistory(services), summaryAllowed=True), PromptPlaceholder(label="AVAILABLE_METHODS", content=extractAvailableMethods(services), summaryAllowed=False), PromptPlaceholder(label="USER_LANGUAGE", content=extractUserLanguage(services), summaryAllowed=False), ] diff --git a/modules/workflows/processing/shared/promptGenerationActionsDynamic.py b/modules/workflows/processing/shared/promptGenerationActionsDynamic.py index f0ffee3a..794f4175 100644 --- a/modules/workflows/processing/shared/promptGenerationActionsDynamic.py +++ b/modules/workflows/processing/shared/promptGenerationActionsDynamic.py @@ -32,7 +32,7 @@ def generateDynamicPlanSelectionPrompt(services, context: Any, learningEngine=No PromptPlaceholder(label="AVAILABLE_DOCUMENTS_SUMMARY", content=extractAvailableDocumentsSummary(services, context), summaryAllowed=True), PromptPlaceholder(label="AVAILABLE_METHODS", content=extractAvailableMethods(services), summaryAllowed=False), # Provide enriched history context for Stage 1 to craft parametersContext - PromptPlaceholder(label="WORKFLOW_HISTORY", content=extractWorkflowHistory(services, context), summaryAllowed=True), + PromptPlaceholder(label="WORKFLOW_HISTORY", content=extractWorkflowHistory(services), summaryAllowed=True), # Provide deterministic indexes so the planner can choose exact labels PromptPlaceholder(label="AVAILABLE_DOCUMENTS_INDEX", content=extractAvailableDocumentsIndex(services, context), summaryAllowed=True), PromptPlaceholder(label="AVAILABLE_CONNECTIONS_INDEX", content=extractAvailableConnectionsIndex(services), summaryAllowed=False), diff --git a/modules/workflows/processing/shared/promptGenerationTaskplan.py b/modules/workflows/processing/shared/promptGenerationTaskplan.py index de101015..9a9008b7 100644 --- a/modules/workflows/processing/shared/promptGenerationTaskplan.py +++ b/modules/workflows/processing/shared/promptGenerationTaskplan.py @@ -23,7 +23,7 @@ def generateTaskPlanningPrompt(services, context: Any) -> PromptBundle: placeholders: List[PromptPlaceholder] = [ PromptPlaceholder(label="USER_PROMPT", content=extractUserPrompt(context), summaryAllowed=False), PromptPlaceholder(label="AVAILABLE_DOCUMENTS_SUMMARY", content=extractAvailableDocumentsSummary(services, context), summaryAllowed=True), - PromptPlaceholder(label="WORKFLOW_HISTORY", content=extractWorkflowHistory(services, context), summaryAllowed=True), + PromptPlaceholder(label="WORKFLOW_HISTORY", content=extractWorkflowHistory(services), summaryAllowed=True), PromptPlaceholder(label="USER_LANGUAGE", content=userLanguage, summaryAllowed=False), ] diff --git a/modules/workflows/processing/shared/stateTools.py b/modules/workflows/processing/shared/stateTools.py new file mode 100644 index 00000000..1f625955 --- /dev/null +++ b/modules/workflows/processing/shared/stateTools.py @@ -0,0 +1,46 @@ +""" +State Tools +Shared utilities for workflow state management and validation. +""" + +import logging +from typing import Any + +logger = logging.getLogger(__name__) + + +class WorkflowStoppedException(Exception): + """Exception raised when a workflow is stopped by the user.""" + pass + + +def checkWorkflowStopped(services: Any) -> None: + """ + Check if workflow has been stopped by user and raise exception if so. + + Args: + services: Services object with workflow and interfaceDbChat for fresh status check + + Raises: + WorkflowStoppedException: If workflow status is "stopped" + """ + workflow = services.workflow + if not workflow: + return + + try: + # Get the current workflow status from the database to avoid stale data + currentWorkflow = services.interfaceDbChat.getWorkflow(workflow.id) + if currentWorkflow and currentWorkflow.status == "stopped": + logger.info("Workflow stopped by user, aborting operation") + raise WorkflowStoppedException("Workflow was stopped by user") + except WorkflowStoppedException: + # Re-raise the stop signal immediately + raise + except Exception as e: + # If we can't get the current status due to other database issues, fall back to the in-memory object + logger.warning(f"Could not check current workflow status from database: {str(e)}") + if workflow and workflow.status == "stopped": + logger.info("Workflow stopped by user (from in-memory object), aborting operation") + raise WorkflowStoppedException("Workflow was stopped by user") + diff --git a/modules/workflows/processing/workflowProcessor.py b/modules/workflows/processing/workflowProcessor.py index e91f6afc..88bb25fd 100644 --- a/modules/workflows/processing/workflowProcessor.py +++ b/modules/workflows/processing/workflowProcessor.py @@ -9,45 +9,27 @@ from modules.workflows.processing.modes.modeBase import BaseMode from modules.workflows.processing.modes.modeActionplan import ActionplanMode from modules.workflows.processing.modes.modeDynamic import DynamicMode from modules.workflows.processing.modes.modeAutomation import AutomationMode +from modules.workflows.processing.shared.stateTools import checkWorkflowStopped logger = logging.getLogger(__name__) -class WorkflowStoppedException(Exception): - """Exception raised when a workflow is stopped by the user.""" - pass - class WorkflowProcessor: """Main workflow processor that delegates to appropriate mode implementations""" - def __init__(self, services, workflow=None): + def __init__(self, services): self.services = services - self.workflow = workflow - self.mode = self._createMode(workflow.workflowMode) + self.mode = self._createMode(services.workflow.workflowMode) def _createMode(self, workflowMode: WorkflowModeEnum) -> BaseMode: """Create the appropriate mode implementation based on workflow mode""" if workflowMode == WorkflowModeEnum.WORKFLOW_DYNAMIC: - return DynamicMode(self.services, self.workflow) + return DynamicMode(self.services) elif workflowMode == WorkflowModeEnum.WORKFLOW_ACTIONPLAN: - return ActionplanMode(self.services, self.workflow) + return ActionplanMode(self.services) elif workflowMode == WorkflowModeEnum.WORKFLOW_AUTOMATION: - return AutomationMode(self.services, self.workflow) + return AutomationMode(self.services) else: raise ValueError(f"Invalid workflow mode: {workflowMode}") - def _checkWorkflowStopped(self, workflow): - """Check if workflow has been stopped by user and raise exception if so""" - try: - # Get the current workflow status from the database to avoid stale data - current_workflow = self.services.interfaceDbChat.getWorkflow(workflow.id) - if current_workflow and current_workflow.status == "stopped": - logger.info("Workflow stopped by user, aborting processing") - raise WorkflowStoppedException("Workflow was stopped by user") - except Exception as e: - # If we can't get the current status due to other database issues, fall back to the in-memory object - logger.warning(f"Could not check current workflow status from database: {str(e)}") - if workflow and workflow.status == "stopped": - logger.info("Workflow stopped by user (from in-memory object), aborting processing") - raise WorkflowStoppedException("Workflow was stopped by user") async def generateTaskPlan(self, userInput: str, workflow: ChatWorkflow) -> TaskPlan: """Generate a high-level task plan for the workflow""" @@ -58,10 +40,10 @@ class WorkflowProcessor: try: # Check workflow status before generating task plan - self._checkWorkflowStopped(workflow) + checkWorkflowStopped(self.services) # Start progress tracking - self.services.workflow.progressLogStart( + self.services.chat.progressLogStart( operationId, "Workflow Planning", "Task Plan Generation", @@ -78,25 +60,25 @@ class WorkflowProcessor: logger.info(f"Workflow Mode: {modeValue}") # Update progress - generating task plan - self.services.workflow.progressLogUpdate(operationId, 0.3, "Analyzing input") + self.services.chat.progressLogUpdate(operationId, 0.3, "Analyzing input") # Delegate to the appropriate mode taskPlan = await self.mode.generateTaskPlan(userInput, workflow) # Update progress - creating task plan message - self.services.workflow.progressLogUpdate(operationId, 0.8, "Creating plan") + self.services.chat.progressLogUpdate(operationId, 0.8, "Creating plan") # Create task plan message await self.mode.createTaskPlanMessage(taskPlan, workflow) # Complete progress tracking - self.services.workflow.progressLogFinish(operationId, True) + self.services.chat.progressLogFinish(operationId, True) return taskPlan except Exception as e: logger.error(f"Error in generateTaskPlan: {str(e)}") # Complete progress tracking with failure - self.services.workflow.progressLogFinish(operationId, False) + self.services.chat.progressLogFinish(operationId, False) raise async def executeTask(self, taskStep: TaskStep, workflow: ChatWorkflow, context: TaskContext, @@ -109,10 +91,10 @@ class WorkflowProcessor: try: # Check workflow status before executing task - self._checkWorkflowStopped(workflow) + checkWorkflowStopped(self.services) # Start progress tracking - self.services.workflow.progressLogStart( + self.services.chat.progressLogStart( operationId, "Workflow Execution", "Task Execution", @@ -125,19 +107,19 @@ class WorkflowProcessor: logger.info(f"Mode: {modeValue}") # Update progress - executing task - self.services.workflow.progressLogUpdate(operationId, 0.2, "Executing") + self.services.chat.progressLogUpdate(operationId, 0.2, "Executing") # Delegate to the appropriate mode result = await self.mode.executeTask(taskStep, workflow, context, taskIndex, totalTasks) # Complete progress tracking - self.services.workflow.progressLogFinish(operationId, True) + self.services.chat.progressLogFinish(operationId, True) return result except Exception as e: logger.error(f"Error in executeTask: {str(e)}") # Complete progress tracking with failure - self.services.workflow.progressLogFinish(operationId, False) + self.services.chat.progressLogFinish(operationId, False) raise async def generateActionItems(self, taskStep: TaskStep, workflow: ChatWorkflow, @@ -145,7 +127,7 @@ class WorkflowProcessor: """Generate actions for a task step using the appropriate mode""" try: # Check workflow status before generating actions - self._checkWorkflowStopped(workflow) + checkWorkflowStopped(self.services) logger.info(f"=== STARTING ACTION GENERATION ===") logger.info(f"Task: {taskStep.objective}") @@ -287,7 +269,7 @@ class WorkflowProcessor: """Prepare task handover data for workflow coordination""" try: # Check workflow status before preparing task handover - self._checkWorkflowStopped(workflow) + checkWorkflowStopped(self.services) # Log handover status summary status = taskResult.status if taskResult else 'unknown' diff --git a/modules/workflows/workflowManager.py b/modules/workflows/workflowManager.py index fbf8bd69..edcce5e0 100644 --- a/modules/workflows/workflowManager.py +++ b/modules/workflows/workflowManager.py @@ -12,7 +12,8 @@ from modules.datamodels.datamodelChat import ( WorkflowModeEnum ) from modules.datamodels.datamodelChat import TaskContext -from modules.workflows.processing.workflowProcessor import WorkflowProcessor, WorkflowStoppedException +from modules.workflows.processing.workflowProcessor import WorkflowProcessor +from modules.workflows.processing.shared.stateTools import WorkflowStoppedException, checkWorkflowStopped logger = logging.getLogger(__name__) @@ -34,22 +35,22 @@ class WorkflowManager: currentTime = self.services.utils.timestampGetUtc() if workflowId: - workflow = self.services.workflow.getWorkflow(workflowId) + workflow = self.services.chat.getWorkflow(workflowId) if not workflow: raise ValueError(f"Workflow {workflowId} not found") - # Store workflow in services for reference - self.services.currentWorkflow = workflow + # Store workflow in services for reference (this is the ChatWorkflow object) + self.services.workflow = workflow if workflow.status == "running": logger.info(f"Stopping running workflow {workflowId} before processing new prompt") workflow.status = "stopped" workflow.lastActivity = currentTime - self.services.workflow.updateWorkflow(workflowId, { + self.services.chat.updateWorkflow(workflowId, { "status": "stopped", "lastActivity": currentTime }) - self.services.workflow.storeLog(workflow, { + self.services.chat.storeLog(workflow, { "message": "Workflow stopped for new prompt", "type": "info", "status": "stopped", @@ -57,7 +58,7 @@ class WorkflowManager: }) newRound = workflow.currentRound + 1 - self.services.workflow.updateWorkflow(workflowId, { + self.services.chat.updateWorkflow(workflowId, { "status": "running", "lastActivity": currentTime, "currentRound": newRound, @@ -70,7 +71,7 @@ class WorkflowManager: workflow.currentRound = newRound workflow.workflowMode = workflowMode - self.services.workflow.storeLog(workflow, { + self.services.chat.storeLog(workflow, { "message": f"Workflow resumed (round {workflow.currentRound}) with mode: {workflowMode}", "type": "info", "status": "running", @@ -94,14 +95,15 @@ class WorkflowManager: "maxSteps": 5 if workflowMode == WorkflowModeEnum.WORKFLOW_DYNAMIC else 1, # Set maxSteps for Dynamic mode } - workflow = self.services.workflow.createWorkflow(workflowData) + workflow = self.services.chat.createWorkflow(workflowData) logger.info(f"Created workflow with mode: {getattr(workflow, 'workflowMode', 'NOT_SET')}") logger.info(f"Workflow data passed: {workflowData.get('workflowMode', 'NOT_IN_DATA')}") - self.services.currentWorkflow = workflow + # Store workflow in services (this is the ChatWorkflow object) + self.services.workflow = workflow # Start workflow processing asynchronously - asyncio.create_task(self._workflowProcess(userInput, workflow)) + asyncio.create_task(self._workflowProcess(userInput)) return workflow except Exception as e: @@ -111,17 +113,20 @@ class WorkflowManager: async def workflowStop(self, workflowId: str) -> ChatWorkflow: """Stops a running workflow.""" try: - workflow = self.services.workflow.getWorkflow(workflowId) + workflow = self.services.chat.getWorkflow(workflowId) if not workflow: raise ValueError(f"Workflow {workflowId} not found") + # Store workflow in services (this is the ChatWorkflow object) + self.services.workflow = workflow + workflow.status = "stopped" workflow.lastActivity = self.services.utils.timestampGetUtc() - self.services.workflow.updateWorkflow(workflowId, { + self.services.chat.updateWorkflow(workflowId, { "status": "stopped", "lastActivity": workflow.lastActivity }) - self.services.workflow.storeLog(workflow, { + self.services.chat.storeLog(workflow, { "message": "Workflow stopped", "type": "warning", "status": "stopped", @@ -134,34 +139,35 @@ class WorkflowManager: # Main processor - async def _workflowProcess(self, userInput: UserInputRequest, workflow: ChatWorkflow) -> None: + async def _workflowProcess(self, userInput: UserInputRequest) -> None: """Process a workflow with user input""" try: # Store the current user prompt in services for easy access throughout the workflow self.services.rawUserPrompt = userInput.prompt self.services.currentUserPrompt = userInput.prompt - # Update the workflow service with the current workflow context - self.services.workflow.setCurrentWorkflow(workflow) + # Reset progress logger for new workflow + self.services.chat._progressLogger = None - self.workflowProcessor = WorkflowProcessor(self.services, workflow) - await self._sendFirstMessage(userInput, workflow) - task_plan = await self._planTasks(userInput, workflow) - await self._executeTasks(task_plan, workflow) - await self._processWorkflowResults(workflow) + self.workflowProcessor = WorkflowProcessor(self.services) + await self._sendFirstMessage(userInput) + task_plan = await self._planTasks(userInput) + await self._executeTasks(task_plan) + await self._processWorkflowResults() except WorkflowStoppedException: - self._handleWorkflowStop(workflow) + self._handleWorkflowStop() except Exception as e: - self._handleWorkflowError(workflow, e) + self._handleWorkflowError(e) # Helper functions - async def _sendFirstMessage(self, userInput: UserInputRequest, workflow: ChatWorkflow) -> None: + async def _sendFirstMessage(self, userInput: UserInputRequest) -> None: """Send first message to start workflow""" try: - self.workflowProcessor._checkWorkflowStopped(workflow) + workflow = self.services.workflow + checkWorkflowStopped(self.services) # Create initial message using interface # For first user message, include round info in the user context label @@ -286,7 +292,7 @@ class WorkflowManager: self.services.interfaceDbComponent.createFileData(fileItem.id, contentBytes) # Collect file info - fileInfo = self.services.workflow.getFileInfo(fileItem.id) + fileInfo = self.services.chat.getFileInfo(fileItem.id) from modules.datamodels.datamodelChat import ChatDocument doc = ChatDocument( fileId=fileItem.id, @@ -310,14 +316,15 @@ class WorkflowManager: logger.warning(f"Failed to process user fileIds: {e}") # Finally, persist and bind the first message with combined documents (context + user) - self.services.workflow.storeMessageWithDocuments(workflow, messageData, createdDocs) + self.services.chat.storeMessageWithDocuments(workflow, messageData, createdDocs) except Exception as e: logger.error(f"Error sending first message: {str(e)}") raise - async def _planTasks(self, userInput: UserInputRequest, workflow: ChatWorkflow): + async def _planTasks(self, userInput: UserInputRequest): """Generate task plan for workflow execution""" + workflow = self.services.workflow handling = self.workflowProcessor # Generate task plan first (shared for both modes) taskPlan = await handling.generateTaskPlan(userInput.prompt, workflow) @@ -328,8 +335,9 @@ class WorkflowManager: logger.info(f"Executing workflow mode={workflowMode} with {len(taskPlan.tasks)} tasks") return taskPlan - async def _executeTasks(self, taskPlan, workflow: ChatWorkflow) -> None: + async def _executeTasks(self, taskPlan) -> None: """Execute all tasks in the task plan and update workflow status.""" + workflow = self.services.workflow handling = self.workflowProcessor totalTasks = len(taskPlan.tasks) allTaskResults: List = [] @@ -377,11 +385,12 @@ class WorkflowManager: workflow.status = "completed" return None - async def _processWorkflowResults(self, workflow: ChatWorkflow) -> None: + async def _processWorkflowResults(self) -> None: """Process workflow results based on workflow status and create appropriate messages""" try: + workflow = self.services.workflow try: - self.workflowProcessor._checkWorkflowStopped(workflow) + checkWorkflowStopped(self.services) except WorkflowStoppedException: logger.info(f"Workflow {workflow.id} was stopped during result processing") @@ -403,12 +412,12 @@ class WorkflowManager: "taskProgress": "stopped", "actionProgress": "stopped" } - self.services.workflow.storeMessageWithDocuments(workflow, stoppedMessage, []) + self.services.chat.storeMessageWithDocuments(workflow, stoppedMessage, []) # Update workflow status to stopped workflow.status = "stopped" workflow.lastActivity = self.services.utils.timestampGetUtc() - self.services.workflow.updateWorkflow(workflow.id, { + self.services.chat.updateWorkflow(workflow.id, { "status": "stopped", "lastActivity": workflow.lastActivity }) @@ -433,12 +442,12 @@ class WorkflowManager: "taskProgress": "stopped", "actionProgress": "stopped" } - self.services.workflow.storeMessageWithDocuments(workflow, stoppedMessage, []) + self.services.chat.storeMessageWithDocuments(workflow, stopped_message, []) # Update workflow status to stopped workflow.status = "stopped" workflow.lastActivity = self.services.utils.timestampGetUtc() - self.services.workflow.updateWorkflow(workflow.id, { + self.services.chat.updateWorkflow(workflow.id, { "status": "stopped", "lastActivity": workflow.lastActivity, "totalTasks": workflow.totalTasks, @@ -446,7 +455,7 @@ class WorkflowManager: }) # Add stopped log entry - self.services.workflow.storeLog(workflow, { + self.services.chat.storeLog(workflow, { "message": "Workflow stopped by user", "type": "warning", "status": "stopped", @@ -472,12 +481,12 @@ class WorkflowManager: "taskProgress": "fail", "actionProgress": "fail" } - self.services.workflow.storeMessageWithDocuments(workflow, errorMessage, []) + self.services.chat.storeMessageWithDocuments(workflow, errorMessage, []) # Update workflow status to failed workflow.status = "failed" workflow.lastActivity = self.services.utils.timestampGetUtc() - self.services.workflow.updateWorkflow(workflow.id, { + self.services.chat.updateWorkflow(workflow.id, { "status": "failed", "lastActivity": workflow.lastActivity, "totalTasks": workflow.totalTasks, @@ -485,7 +494,7 @@ class WorkflowManager: }) # Add failed log entry - self.services.workflow.storeLog(workflow, { + self.services.chat.storeLog(workflow, { "message": "Workflow failed: Unknown error", "type": "error", "status": "failed", @@ -494,7 +503,7 @@ class WorkflowManager: return # For successful workflows, send detailed completion message - await self._sendLastMessage(workflow) + await self._sendLastMessage() except Exception as e: logger.error(f"Error processing workflow results: {str(e)}") @@ -516,28 +525,29 @@ class WorkflowManager: "taskProgress": "fail", "actionProgress": "fail" } - self.services.workflow.storeMessageWithDocuments(workflow, errorMessage, []) + self.services.chat.storeMessageWithDocuments(workflow, error_message, []) # Update workflow status to failed workflow.status = "failed" workflow.lastActivity = self.services.utils.timestampGetUtc() - self.services.workflow.updateWorkflow(workflow.id, { + self.services.chat.updateWorkflow(workflow.id, { "status": "failed", "lastActivity": workflow.lastActivity, "totalTasks": workflow.totalTasks, "totalActions": workflow.totalActions }) - async def _sendLastMessage(self, workflow: ChatWorkflow) -> None: + async def _sendLastMessage(self) -> None: """Send last message to complete workflow (only for successful workflows)""" try: + workflow = self.services.workflow # Safety check: ensure this is only called for successful workflows if workflow.status in ['stopped', 'failed']: logger.warning(f"Attempted to send last message for {workflow.status} workflow {workflow.id}") return # Generate feedback - feedback = await self._generateWorkflowFeedback(workflow) + feedback = await self._generateWorkflowFeedback() # Create last message using interface messageData = { @@ -559,20 +569,20 @@ class WorkflowManager: } # Create message using interface - self.services.workflow.storeMessageWithDocuments(workflow, messageData, []) + self.services.chat.storeMessageWithDocuments(workflow, messageData, []) # Update workflow status to completed workflow.status = "completed" workflow.lastActivity = self.services.utils.timestampGetUtc() # Update workflow in database - self.services.workflow.updateWorkflow(workflow.id, { + self.services.chat.updateWorkflow(workflow.id, { "status": "completed", "lastActivity": workflow.lastActivity }) # Add completion log entry - self.services.workflow.storeLog(workflow, { + self.services.chat.storeLog(workflow, { "message": "Workflow completed", "type": "success", "status": "completed", @@ -583,10 +593,11 @@ class WorkflowManager: logger.error(f"Error sending last message: {str(e)}") raise - async def _generateWorkflowFeedback(self, workflow: ChatWorkflow) -> str: + async def _generateWorkflowFeedback(self) -> str: """Generate feedback message for workflow completion""" try: - self.workflowProcessor._checkWorkflowStopped(workflow) + workflow = self.services.workflow + checkWorkflowStopped(self.services) # Count messages by role userMessages = [msg for msg in workflow.messages if msg.role == 'user'] @@ -610,14 +621,15 @@ class WorkflowManager: logger.error(f"Error generating workflow feedback: {str(e)}") return "Workflow processing completed." - def _handleWorkflowStop(self, workflow: ChatWorkflow) -> None: + def _handleWorkflowStop(self) -> None: """Handle workflow stop exception""" + workflow = self.services.workflow logger.info("Workflow stopped by user") # Update workflow status to stopped workflow.status = "stopped" workflow.lastActivity = self.services.utils.timestampGetUtc() - self.services.workflow.updateWorkflow(workflow.id, { + self.services.chat.updateWorkflow(workflow.id, { "status": "stopped", "lastActivity": workflow.lastActivity, "totalTasks": workflow.totalTasks, @@ -642,24 +654,25 @@ class WorkflowManager: "taskProgress": "pending", "actionProgress": "pending" } - self.services.workflow.storeMessageWithDocuments(workflow, stopped_message, []) + self.services.chat.storeMessageWithDocuments(workflow, stopped_message, []) # Add log entry - self.services.workflow.storeLog(workflow, { + self.services.chat.storeLog(workflow, { "message": "Workflow stopped by user", "type": "warning", "status": "stopped", "progress": 100 }) - def _handleWorkflowError(self, workflow: ChatWorkflow, error: Exception) -> None: + def _handleWorkflowError(self, error: Exception) -> None: """Handle workflow error exception""" + workflow = self.services.workflow logger.error(f"Workflow processing error: {str(error)}") # Update workflow status to failed workflow.status = "failed" workflow.lastActivity = self.services.utils.timestampGetUtc() - self.services.workflow.updateWorkflow(workflow.id, { + self.services.chat.updateWorkflow(workflow.id, { "status": "failed", "lastActivity": workflow.lastActivity, "totalTasks": workflow.totalTasks, @@ -684,10 +697,10 @@ class WorkflowManager: "taskProgress": "fail", "actionProgress": "fail" } - self.services.workflow.storeMessageWithDocuments(workflow, error_message, []) + self.services.chat.storeMessageWithDocuments(workflow, error_message, []) # Add error log entry - self.services.workflow.storeLog(workflow, { + self.services.chat.storeLog(workflow, { "message": f"Workflow failed: {str(error)}", "type": "error", "status": "failed", @@ -701,8 +714,8 @@ class WorkflowManager: documents = [] for fileId in fileIds: try: - # Get file info from unified workflow service - fileInfo = self.services.workflow.getFileInfo(fileId) + # Get file info from chat service + fileInfo = self.services.chat.getFileInfo(fileId) if fileInfo: # Create document directly with all file attributes document = ChatDocument( diff --git a/test4_method_ai_operations.py b/test4_method_ai_operations.py index 4de11ece..e4f91587 100644 --- a/test4_method_ai_operations.py +++ b/test4_method_ai_operations.py @@ -214,11 +214,6 @@ class MethodAiOperationsTester: print(f"Debug: services id: {id(self.services)}") print(f"Debug: methodAi.services id: {id(self.methodAi.services)}") - # Final safety check: ensure methodAi.services has the workflow - if hasattr(self.methodAi, 'services') and not self.methodAi.services.currentWorkflow: - print(f"āš ļø Fixing: Setting workflow in methodAi.services...") - self.methodAi.services.currentWorkflow = self.services.currentWorkflow - actionResult = await self.methodAi.process(parameters) endTime = asyncio.get_event_loop().time()