From 90663963ffffb6b3715c49bb74be6e87e124dd9e Mon Sep 17 00:00:00 2001
From: ValueOn AG
Date: Tue, 4 Nov 2025 10:30:23 +0100
Subject: [PATCH] refactored and properly separated self.chat and self.workflow
---
modules/features/syncDelta/mainSyncDelta.py | 2 +-
modules/services/__init__.py | 4 +-
modules/services/serviceAi/mainServiceAi.py | 72 +++++-----
.../mainServiceChat.py} | 99 +++++++++----
.../mainServiceExtraction.py | 29 ++--
.../mainServiceGeneration.py | 18 +--
modules/services/serviceWeb/mainServiceWeb.py | 10 +-
modules/shared/progressLogger.py | 17 ++-
modules/workflows/methods/methodAi.py | 28 ++--
modules/workflows/methods/methodBase.py | 2 +-
modules/workflows/methods/methodOutlook.py | 25 ++--
modules/workflows/methods/methodSharepoint.py | 20 +--
.../processing/core/actionExecutor.py | 21 +--
.../processing/core/messageCreator.py | 43 ++----
.../workflows/processing/core/taskPlanner.py | 19 +--
.../processing/modes/modeActionplan.py | 23 +--
.../processing/modes/modeAutomation.py | 13 +-
.../workflows/processing/modes/modeBase.py | 20 +--
.../workflows/processing/modes/modeDynamic.py | 11 +-
.../processing/shared/placeholderFactory.py | 15 +-
.../promptGenerationActionsActionplan.py | 2 +-
.../shared/promptGenerationActionsDynamic.py | 2 +-
.../shared/promptGenerationTaskplan.py | 2 +-
.../workflows/processing/shared/stateTools.py | 46 ++++++
.../workflows/processing/workflowProcessor.py | 56 +++-----
modules/workflows/workflowManager.py | 133 ++++++++++--------
test4_method_ai_operations.py | 5 -
27 files changed, 380 insertions(+), 357 deletions(-)
rename modules/services/{serviceWorkflow/mainServiceWorkflow.py => serviceChat/mainServiceChat.py} (90%)
create mode 100644 modules/workflows/processing/shared/stateTools.py
diff --git a/modules/features/syncDelta/mainSyncDelta.py b/modules/features/syncDelta/mainSyncDelta.py
index 2d6a2b1b..32f29922 100644
--- a/modules/features/syncDelta/mainSyncDelta.py
+++ b/modules/features/syncDelta/mainSyncDelta.py
@@ -83,7 +83,7 @@ class ManagerSyncDelta:
self.APP_ENV_TYPE = self.services.utils.configGet("APP_ENV_TYPE", "dev")
self.JIRA_API_TOKEN = self.services.utils.configGet("Feature_SyncDelta_JIRA_DELTA_TOKEN_SECRET", "")
# Resolve SharePoint connection for the configured user id
- self.sharepointConnection = self.services.workflow.getUserConnectionByExternalUsername("msft", self.SHAREPOINT_USER_ID)
+ self.sharepointConnection = self.services.chat.getUserConnectionByExternalUsername("msft", self.SHAREPOINT_USER_ID)
if not self.sharepointConnection:
logger.error(
f"No SharePoint connection found for user: {self.SHAREPOINT_USER_ID}"
diff --git a/modules/services/__init__.py b/modules/services/__init__.py
index 87b13207..3e33d208 100644
--- a/modules/services/__init__.py
+++ b/modules/services/__init__.py
@@ -75,8 +75,8 @@ class Services:
from .serviceTicket.mainServiceTicket import TicketService
self.ticket = PublicService(TicketService(self))
- from .serviceWorkflow.mainServiceWorkflow import WorkflowService
- self.workflow = PublicService(WorkflowService(self))
+ from .serviceChat.mainServiceChat import ChatService
+ self.chat = PublicService(ChatService(self))
from .serviceUtils.mainServiceUtils import UtilsService
self.utils = PublicService(UtilsService(self))
diff --git a/modules/services/serviceAi/mainServiceAi.py b/modules/services/serviceAi/mainServiceAi.py
index 2cac2905..95c51779 100644
--- a/modules/services/serviceAi/mainServiceAi.py
+++ b/modules/services/serviceAi/mainServiceAi.py
@@ -205,11 +205,11 @@ Respond with ONLY a JSON object in this exact format:
# Update progress for iteration start
if operationId:
if iteration == 1:
- self.services.workflow.progressLogUpdate(operationId, 0.5, f"Starting AI call iteration {iteration}")
+ self.services.chat.progressLogUpdate(operationId, 0.5, f"Starting AI call iteration {iteration}")
else:
# For continuation iterations, show progress incrementally
baseProgress = 0.5 + (min(iteration - 1, maxIterations) / maxIterations * 0.4) # Progress from 0.5 to 0.9 over maxIterations iterations
- self.services.workflow.progressLogUpdate(operationId, baseProgress, f"Continuing generation (iteration {iteration})")
+ self.services.chat.progressLogUpdate(operationId, baseProgress, f"Continuing generation (iteration {iteration})")
# Build iteration prompt
if len(allSections) > 0 and promptBuilder and promptArgs:
@@ -227,7 +227,7 @@ Respond with ONLY a JSON object in this exact format:
# Make AI call
try:
if operationId and iteration == 1:
- self.services.workflow.progressLogUpdate(operationId, 0.51, "Calling AI model")
+ self.services.chat.progressLogUpdate(operationId, 0.51, "Calling AI model")
request = AiCallRequest(
prompt=iterationPrompt,
context="",
@@ -246,10 +246,10 @@ Respond with ONLY a JSON object in this exact format:
# Update progress after AI call
if operationId:
if iteration == 1:
- self.services.workflow.progressLogUpdate(operationId, 0.6, f"AI response received (iteration {iteration})")
+ self.services.chat.progressLogUpdate(operationId, 0.6, f"AI response received (iteration {iteration})")
else:
progress = 0.6 + (min(iteration - 1, 10) * 0.03)
- self.services.workflow.progressLogUpdate(operationId, progress, f"Processing response (iteration {iteration})")
+ self.services.chat.progressLogUpdate(operationId, progress, f"Processing response (iteration {iteration})")
# Write raw AI response to debug file
if iteration == 1:
@@ -258,8 +258,8 @@ Respond with ONLY a JSON object in this exact format:
self.services.utils.writeDebugFile(result, f"{debugPrefix}_response_iteration_{iteration}")
# Emit stats for this iteration
- self.services.workflow.storeWorkflowStat(
- self.services.currentWorkflow,
+ self.services.chat.storeWorkflowStat(
+ self.services.workflow,
response,
f"ai.call.{debugPrefix}.iteration_{iteration}"
)
@@ -286,7 +286,7 @@ Respond with ONLY a JSON object in this exact format:
# Update progress after parsing
if operationId:
if extractedSections:
- self.services.workflow.progressLogUpdate(operationId, 0.65 + (min(iteration - 1, 10) * 0.025), f"Extracted {len(extractedSections)} sections (iteration {iteration})")
+ self.services.chat.progressLogUpdate(operationId, 0.65 + (min(iteration - 1, 10) * 0.025), f"Extracted {len(extractedSections)} sections (iteration {iteration})")
if not extractedSections:
# If we're in continuation mode and JSON was incomplete, don't stop - continue to allow retry
@@ -306,7 +306,7 @@ Respond with ONLY a JSON object in this exact format:
else:
# Done - build final result
if operationId:
- self.services.workflow.progressLogUpdate(operationId, 0.95, f"Generation complete ({iteration} iterations, {len(allSections)} sections)")
+ self.services.chat.progressLogUpdate(operationId, 0.95, f"Generation complete ({iteration} iterations, {len(allSections)} sections)")
logger.info(f"Generation complete after {iteration} iterations: {len(allSections)} sections")
break
@@ -566,11 +566,11 @@ Respond with ONLY a JSON object in this exact format:
await self._ensureAiObjectsInitialized()
# Create separate operationId for detailed progress tracking
- workflowId = self.services.currentWorkflow.id if self.services.currentWorkflow else f"no-workflow-{int(time.time())}"
+ workflowId = self.services.workflow.id if self.services.workflow else f"no-workflow-{int(time.time())}"
aiOperationId = f"ai_documents_{workflowId}_{int(time.time())}"
# Start progress tracking for this operation
- self.services.workflow.progressLogStart(
+ self.services.chat.progressLogStart(
aiOperationId,
"AI call with documents",
"Document Generation",
@@ -580,7 +580,7 @@ Respond with ONLY a JSON object in this exact format:
try:
if options is None or (hasattr(options, 'operationType') and options.operationType is None):
# Use AI to determine parameters ONLY when truly needed (options=None OR operationType=None)
- self.services.workflow.progressLogUpdate(aiOperationId, 0.1, "Analyzing prompt parameters")
+ self.services.chat.progressLogUpdate(aiOperationId, 0.1, "Analyzing prompt parameters")
options = await self._analyzePromptAndCreateOptions(prompt)
# Check operationType FIRST - some operations need direct routing (before document generation checks)
@@ -591,7 +591,7 @@ Respond with ONLY a JSON object in this exact format:
if isImageRequest:
# Image generation uses generic call path but bypasses document generation pipeline
- self.services.workflow.progressLogUpdate(aiOperationId, 0.4, "Calling AI for image generation")
+ self.services.chat.progressLogUpdate(aiOperationId, 0.4, "Calling AI for image generation")
# Call via generic path (no looping for images)
request = AiCallRequest(
@@ -621,19 +621,19 @@ Respond with ONLY a JSON object in this exact format:
result = response.content
# Emit stats for image generation
- self.services.workflow.storeWorkflowStat(
- self.services.currentWorkflow,
+ self.services.chat.storeWorkflowStat(
+ self.services.workflow,
response,
f"ai.generate.image"
)
- self.services.workflow.progressLogUpdate(aiOperationId, 0.9, "Image generated")
- self.services.workflow.progressLogFinish(aiOperationId, True)
+ self.services.chat.progressLogUpdate(aiOperationId, 0.9, "Image generated")
+ self.services.chat.progressLogFinish(aiOperationId, True)
return result
else:
errorMsg = f"No image data returned: {response.content}"
logger.error(f"Error in AI image generation: {errorMsg}")
- self.services.workflow.progressLogFinish(aiOperationId, False)
+ self.services.chat.progressLogFinish(aiOperationId, False)
return {"success": False, "error": errorMsg}
# Handle WEB_SEARCH and WEB_CRAWL operations - route directly to connectors
@@ -645,7 +645,7 @@ Respond with ONLY a JSON object in this exact format:
# Web operations: prompt is already structured JSON (AiCallPromptWebSearch/WebCrawl)
# Route directly through centralized AI call - model selector chooses appropriate connector
# Connector parses the JSON prompt and executes the operation
- self.services.workflow.progressLogUpdate(aiOperationId, 0.4, f"Calling AI for {opType.name}")
+ self.services.chat.progressLogUpdate(aiOperationId, 0.4, f"Calling AI for {opType.name}")
request = AiCallRequest(
prompt=prompt, # Pass raw JSON prompt unchanged - connector will parse it
@@ -658,19 +658,19 @@ Respond with ONLY a JSON object in this exact format:
# Extract result from response
if response.content:
# Emit stats for web operation
- self.services.workflow.storeWorkflowStat(
- self.services.currentWorkflow,
+ self.services.chat.storeWorkflowStat(
+ self.services.workflow,
response,
f"ai.{opType.name.lower()}"
)
- self.services.workflow.progressLogUpdate(aiOperationId, 0.9, f"{opType.name} completed")
- self.services.workflow.progressLogFinish(aiOperationId, True)
+ self.services.chat.progressLogUpdate(aiOperationId, 0.9, f"{opType.name} completed")
+ self.services.chat.progressLogFinish(aiOperationId, True)
return response.content
else:
errorMsg = f"No content returned from {opType.name}: {response.content}"
logger.error(f"Error in {opType.name}: {errorMsg}")
- self.services.workflow.progressLogFinish(aiOperationId, False)
+ self.services.chat.progressLogFinish(aiOperationId, False)
return {"success": False, "error": errorMsg}
# CRITICAL: For document generation with JSON templates, NEVER compress the prompt
@@ -685,13 +685,13 @@ Respond with ONLY a JSON object in this exact format:
if outputFormat:
# Use unified generation method for all document generation
if documents and len(documents) > 0:
- self.services.workflow.progressLogUpdate(aiOperationId, 0.2, f"Extracting content from {len(documents)} documents")
+ self.services.chat.progressLogUpdate(aiOperationId, 0.2, f"Extracting content from {len(documents)} documents")
extracted_content = await self.callAiText(prompt, documents, options, aiOperationId)
else:
- self.services.workflow.progressLogUpdate(aiOperationId, 0.2, "Preparing for direct generation")
+ self.services.chat.progressLogUpdate(aiOperationId, 0.2, "Preparing for direct generation")
extracted_content = None
- self.services.workflow.progressLogUpdate(aiOperationId, 0.3, "Building generation prompt")
+ self.services.chat.progressLogUpdate(aiOperationId, 0.3, "Building generation prompt")
from modules.services.serviceGeneration.subPromptBuilderGeneration import buildGenerationPrompt
# First call without continuation context
generation_prompt = await buildGenerationPrompt(outputFormat, prompt, title, extracted_content, None)
@@ -704,7 +704,7 @@ Respond with ONLY a JSON object in this exact format:
"extracted_content": extracted_content
}
- self.services.workflow.progressLogUpdate(aiOperationId, 0.4, "Calling AI for content generation")
+ self.services.chat.progressLogUpdate(aiOperationId, 0.4, "Calling AI for content generation")
generated_json = await self._callAiWithLooping(
generation_prompt,
options,
@@ -714,7 +714,7 @@ Respond with ONLY a JSON object in this exact format:
aiOperationId
)
- self.services.workflow.progressLogUpdate(aiOperationId, 0.7, "Parsing generated JSON")
+ self.services.chat.progressLogUpdate(aiOperationId, 0.7, "Parsing generated JSON")
# Parse the generated JSON (extract fenced/embedded JSON first)
try:
extracted_json = self.services.utils.jsonExtractString(generated_json)
@@ -728,10 +728,10 @@ Respond with ONLY a JSON object in this exact format:
# Write the problematic JSON to debug file
self.services.utils.writeDebugFile(generated_json, "failed_json_parsing")
- self.services.workflow.progressLogFinish(aiOperationId, False)
+ self.services.chat.progressLogFinish(aiOperationId, False)
return {"success": False, "error": f"Generated content is not valid JSON: {str(e)}"}
- self.services.workflow.progressLogUpdate(aiOperationId, 0.8, f"Rendering to {outputFormat} format")
+ self.services.chat.progressLogUpdate(aiOperationId, 0.8, f"Rendering to {outputFormat} format")
# Render to final format using the existing renderer
try:
from modules.services.serviceGeneration.mainServiceGeneration import GenerationService
@@ -761,16 +761,16 @@ Respond with ONLY a JSON object in this exact format:
# Log AI response for debugging
self.services.utils.writeDebugFile(str(result), "document_generation_response", documents)
- self.services.workflow.progressLogFinish(aiOperationId, True)
+ self.services.chat.progressLogFinish(aiOperationId, True)
return result
except Exception as e:
logger.error(f"Error rendering document: {str(e)}")
- self.services.workflow.progressLogFinish(aiOperationId, False)
+ self.services.chat.progressLogFinish(aiOperationId, False)
return {"success": False, "error": f"Rendering failed: {str(e)}"}
# Handle text calls (no output format specified)
- self.services.workflow.progressLogUpdate(aiOperationId, 0.5, "Processing text call")
+ self.services.chat.progressLogUpdate(aiOperationId, 0.5, "Processing text call")
if documents:
# Use document processing for text calls with documents
result = await self.callAiText(prompt, documents, options, aiOperationId)
@@ -778,12 +778,12 @@ Respond with ONLY a JSON object in this exact format:
# Use shared core function for direct text calls
result = await self._callAiWithLooping(prompt, options, "text", None, None, aiOperationId)
- self.services.workflow.progressLogFinish(aiOperationId, True)
+ self.services.chat.progressLogFinish(aiOperationId, True)
return result
except Exception as e:
logger.error(f"Error in callAiDocuments: {str(e)}")
- self.services.workflow.progressLogFinish(aiOperationId, False)
+ self.services.chat.progressLogFinish(aiOperationId, False)
raise
async def callAiText(
diff --git a/modules/services/serviceWorkflow/mainServiceWorkflow.py b/modules/services/serviceChat/mainServiceChat.py
similarity index 90%
rename from modules/services/serviceWorkflow/mainServiceWorkflow.py
rename to modules/services/serviceChat/mainServiceChat.py
index 65019280..495985b5 100644
--- a/modules/services/serviceWorkflow/mainServiceWorkflow.py
+++ b/modules/services/serviceChat/mainServiceChat.py
@@ -8,13 +8,13 @@ from modules.shared.progressLogger import ProgressLogger
logger = logging.getLogger(__name__)
-class WorkflowService:
+class ChatService:
"""Service class containing methods for document processing, chat operations, and workflow management"""
def __init__(self, serviceCenter):
self.services = serviceCenter
self.user = serviceCenter.user
- self.workflow = serviceCenter.workflow
+ # self.services.workflow is now the ChatWorkflow object (stable during workflow execution)
self.interfaceDbChat = serviceCenter.interfaceDbChat
self.interfaceDbComponent = serviceCenter.interfaceDbComponent
self.interfaceDbApp = serviceCenter.interfaceDbApp
@@ -23,11 +23,16 @@ class WorkflowService:
def getChatDocumentsFromDocumentList(self, documentList: List[str]) -> List[ChatDocument]:
"""Get ChatDocuments from a list of document references using all three formats."""
try:
- workflow = self.services.currentWorkflow
- workflow_id = workflow.id if workflow and hasattr(workflow, 'id') else 'NO_ID'
- workflow_obj_id = id(workflow) if workflow else None
+ # Use self.services.workflow which is the ChatWorkflow object (stable during workflow execution)
+ workflow = self.services.workflow
+ if not workflow:
+ logger.error("getChatDocumentsFromDocumentList: No workflow available (self.services.workflow is not set)")
+ return []
+
+ workflow_id = workflow.id if hasattr(workflow, 'id') else 'NO_ID'
+ workflow_obj_id = id(workflow)
logger.debug(f"getChatDocumentsFromDocumentList: input documentList = {documentList}")
- logger.debug(f"getChatDocumentsFromDocumentList: currentWorkflow.id = {workflow_id}, workflow object id = {workflow_obj_id}")
+ logger.debug(f"getChatDocumentsFromDocumentList: using workflow.id = {workflow_id}, workflow object id = {workflow_obj_id}")
# Debug: list available messages with their labels and document names
try:
@@ -59,6 +64,11 @@ class WorkflowService:
doc_id = parts[1]
# Find the document by ID
for message in workflow.messages:
+ # Validate message belongs to this workflow
+ msg_workflow_id = getattr(message, 'workflowId', None)
+ if msg_workflow_id and msg_workflow_id != workflow_id:
+ continue
+
if message.documents:
for doc in message.documents:
if doc.id == doc_id:
@@ -75,6 +85,11 @@ class WorkflowService:
# First try to find the message by ID in the current workflow
message_found = None
for message in workflow.messages:
+ # Validate message belongs to this workflow
+ msg_workflow_id = getattr(message, 'workflowId', None)
+ if msg_workflow_id and msg_workflow_id != workflow_id:
+ continue
+
if str(message.id) == message_id:
message_found = message
break
@@ -96,6 +111,12 @@ class WorkflowService:
label = parts[1]
message_found = None
for message in workflow.messages:
+ # Validate message belongs to this workflow
+ msg_workflow_id = getattr(message, 'workflowId', None)
+ if msg_workflow_id and msg_workflow_id != workflow_id:
+ logger.warning(f"Message {message.id} has workflowId {msg_workflow_id} but belongs to workflow {workflow_id}. Skipping.")
+ continue
+
msg_label = getattr(message, 'documentsLabel', None)
if msg_label == label:
message_found = message
@@ -120,6 +141,12 @@ class WorkflowService:
# In case of retries, we want the NEWEST message (most recent publishedAt)
matching_messages = []
for message in workflow.messages:
+ # Validate message belongs to this workflow
+ msg_workflow_id = getattr(message, 'workflowId', None)
+ if msg_workflow_id and msg_workflow_id != workflow_id:
+ logger.debug(f"Skipping message {message.id} with workflowId {msg_workflow_id} (expected {workflow_id})")
+ continue
+
msg_documents_label = getattr(message, 'documentsLabel', '')
# Check if this message's documentsLabel matches our reference
@@ -358,10 +385,13 @@ class WorkflowService:
def getWorkflowContext(self) -> Dict[str, int]:
"""Get current workflow context for document generation"""
try:
+ workflow = self.services.workflow
+ if not workflow:
+ return {'currentRound': 0, 'currentTask': 0, 'currentAction': 0}
return {
- 'currentRound': self.workflow.currentRound if hasattr(self.workflow, 'currentRound') else 0,
- 'currentTask': self.workflow.currentTask if hasattr(self.workflow, 'currentTask') else 0,
- 'currentAction': self.workflow.currentAction if hasattr(self.workflow, 'currentAction') else 0
+ 'currentRound': workflow.currentRound if hasattr(workflow, 'currentRound') else 0,
+ 'currentTask': workflow.currentTask if hasattr(workflow, 'currentTask') else 0,
+ 'currentAction': workflow.currentAction if hasattr(workflow, 'currentAction') else 0
}
except Exception as e:
logger.error(f"Error getting workflow context: {str(e)}")
@@ -370,7 +400,10 @@ class WorkflowService:
def setWorkflowContext(self, roundNumber: int = None, taskNumber: int = None, actionNumber: int = None):
"""Set current workflow context for document generation and routing"""
try:
- workflow = self.services.currentWorkflow
+ workflow = self.services.workflow
+ if not workflow:
+ logger.error("setWorkflowContext: No workflow available")
+ return
# Prepare update data
update_data = {}
@@ -396,15 +429,26 @@ class WorkflowService:
def getWorkflowStats(self) -> Dict[str, Any]:
"""Get comprehensive workflow statistics including current context"""
try:
+ workflow = self.services.workflow
workflow_context = self.getWorkflowContext()
+ if not workflow:
+ return {
+ 'currentRound': workflow_context['currentRound'],
+ 'currentTask': workflow_context['currentTask'],
+ 'currentAction': workflow_context['currentAction'],
+ 'totalTasks': 0,
+ 'totalActions': 0,
+ 'workflowStatus': 'unknown',
+ 'workflowId': 'unknown'
+ }
return {
'currentRound': workflow_context['currentRound'],
'currentTask': workflow_context['currentTask'],
'currentAction': workflow_context['currentAction'],
- 'totalTasks': self.workflow.totalTasks if hasattr(self.workflow, 'totalTasks') else 0,
- 'totalActions': self.workflow.totalActions if hasattr(self.workflow, 'totalActions') else 0,
- 'workflowStatus': self.workflow.status if hasattr(self.workflow, 'status') else 'unknown',
- 'workflowId': self.workflow.id if hasattr(self.workflow, 'id') else 'unknown'
+ 'totalTasks': workflow.totalTasks if hasattr(workflow, 'totalTasks') else 0,
+ 'totalActions': workflow.totalActions if hasattr(workflow, 'totalActions') else 0,
+ 'workflowStatus': workflow.status if hasattr(workflow, 'status') else 'unknown',
+ 'workflowId': workflow.id if hasattr(workflow, 'id') else 'unknown'
}
except Exception as e:
logger.error(f"Error getting workflow stats: {str(e)}")
@@ -532,7 +576,9 @@ class WorkflowService:
def getDocumentCount(self) -> str:
"""Get document count for task planning (matching old handlingTasks.py logic)"""
try:
- workflow = self.services.currentWorkflow
+ workflow = self.services.workflow
+ if not workflow:
+ return "No documents available"
# Count documents from all messages in the workflow (like old system)
total_docs = 0
@@ -551,7 +597,9 @@ class WorkflowService:
def getWorkflowHistoryContext(self) -> str:
"""Get workflow history context for task planning (matching old handlingTasks.py logic)"""
try:
- workflow = self.services.currentWorkflow
+ workflow = self.services.workflow
+ if not workflow:
+ return "No previous round context available"
# Check if there are any previous rounds by looking for "first" messages
has_previous_rounds = False
@@ -594,8 +642,7 @@ class WorkflowService:
workflow_id = workflow.id if hasattr(workflow, 'id') else 'NO_ID'
workflow_obj_id = id(workflow)
- current_workflow_obj_id = id(self.services.currentWorkflow) if self.services.currentWorkflow else None
- logger.debug(f"getAvailableDocuments: workflow.id = {workflow_id}, workflow object id = {workflow_obj_id}, currentWorkflow object id = {current_workflow_obj_id}")
+ logger.debug(f"getAvailableDocuments: workflow.id = {workflow_id}, workflow object id = {workflow_obj_id}")
# Use the provided workflow object directly to avoid database reload issues
# that can cause filename truncation. The workflow object should already be up-to-date.
@@ -832,22 +879,15 @@ class WorkflowService:
logger.error(f"Error getting connection reference list: {str(e)}")
return []
- def setCurrentWorkflow(self, workflow):
- """Set the current workflow reference for this service"""
- self.workflow = workflow
- # Reset progress logger for new workflow
- self._progressLogger = None
def _getProgressLogger(self):
"""Get or create the progress logger instance"""
if self._progressLogger is None:
- # Use currentWorkflow from self.services instead of self.workflow (which is self)
- workflow = getattr(self.services, 'currentWorkflow', None)
- self._progressLogger = ProgressLogger(self, workflow)
+ self._progressLogger = ProgressLogger(self.services)
return self._progressLogger
- def createProgressLogger(self, workflow) -> ProgressLogger:
- return ProgressLogger(self, workflow)
+ def createProgressLogger(self) -> ProgressLogger:
+ return ProgressLogger(self.services)
def progressLogStart(self, operationId: str, serviceName: str, actionName: str, context: str = ""):
"""Wrapper for ProgressLogger.startOperation"""
@@ -862,4 +902,5 @@ class WorkflowService:
def progressLogFinish(self, operationId: str, success: bool = True):
"""Wrapper for ProgressLogger.finishOperation"""
progressLogger = self._getProgressLogger()
- return progressLogger.finishOperation(operationId, success)
\ No newline at end of file
+ return progressLogger.finishOperation(operationId, success)
+
diff --git a/modules/services/serviceExtraction/mainServiceExtraction.py b/modules/services/serviceExtraction/mainServiceExtraction.py
index 8217d246..4cc7702d 100644
--- a/modules/services/serviceExtraction/mainServiceExtraction.py
+++ b/modules/services/serviceExtraction/mainServiceExtraction.py
@@ -135,8 +135,8 @@ class ExtractionService:
errorCount=0
)
- self.services.workflow.storeWorkflowStat(
- self.services.currentWorkflow,
+ self.services.chat.storeWorkflowStat(
+ self.services.workflow,
aiResponse,
f"extraction.process.{doc.mimeType}"
)
@@ -422,9 +422,9 @@ class ExtractionService:
# Create operationId if not provided
if not operationId:
- workflowId = self.services.currentWorkflow.id if self.services.currentWorkflow else f"no-workflow-{int(time.time())}"
+ workflowId = self.services.workflow.id if self.services.workflow else f"no-workflow-{int(time.time())}"
operationId = f"ai_text_extract_{workflowId}_{int(time.time())}"
- self.services.workflow.progressLogStart(
+ self.services.chat.progressLogStart(
operationId,
"AI Text Extract",
"Document Processing",
@@ -452,35 +452,35 @@ class ExtractionService:
# Extract content WITHOUT chunking
if operationId:
- self.services.workflow.progressLogUpdate(operationId, 0.1, f"Extracting content from {len(documents)} documents")
+ self.services.chat.progressLogUpdate(operationId, 0.1, f"Extracting content from {len(documents)} documents")
extractionResult = self.extractContent(documents, extractionOptions)
if not isinstance(extractionResult, list):
if operationId:
- self.services.workflow.progressLogFinish(operationId, False)
+ self.services.chat.progressLogFinish(operationId, False)
return "[Error: No extraction results]"
# Process parts (not chunks) with model-aware AI calls
if operationId:
- self.services.workflow.progressLogUpdate(operationId, 0.3, f"Processing {len(extractionResult)} extracted content parts")
+ self.services.chat.progressLogUpdate(operationId, 0.3, f"Processing {len(extractionResult)} extracted content parts")
partResults = await self._processPartsWithMapping(extractionResult, prompt, aiObjects, options, operationId)
# Merge results using existing merging system
if operationId:
- self.services.workflow.progressLogUpdate(operationId, 0.9, f"Merging {len(partResults)} part results")
+ self.services.chat.progressLogUpdate(operationId, 0.9, f"Merging {len(partResults)} part results")
mergedContent = self._mergePartResults(partResults, options)
# Save merged extraction content to debug
self.services.utils.writeDebugFile(mergedContent or '', "extraction_merged_text")
if operationId:
- self.services.workflow.progressLogFinish(operationId, True)
+ self.services.chat.progressLogFinish(operationId, True)
return mergedContent
except Exception as e:
logger.error(f"Error in processDocumentsPerChunk: {str(e)}")
if operationId:
- self.services.workflow.progressLogFinish(operationId, False)
+ self.services.chat.progressLogFinish(operationId, False)
raise
async def _processPartsWithMapping(
@@ -539,21 +539,22 @@ class ExtractionService:
if operationId and totalParts > 0:
processedCount[0] += 1
progress = 0.3 + (processedCount[0] / totalParts * 0.6) # Progress from 0.3 to 0.9
- self.services.workflow.progressLogUpdate(operationId, progress, f"Processing part {processedCount[0]}/{totalParts}")
+ self.services.chat.progressLogUpdate(operationId, progress, f"Processing part {processedCount[0]}/{totalParts}")
# Create progress callback for chunking
def chunkingProgressCallback(chunkProgress: float, status: str):
"""Callback to log chunking progress as ChatLog entries"""
- if self.services.workflow and self.services.currentWorkflow:
+ workflow = self.services.workflow
+ if workflow:
logData = {
- "workflowId": self.services.currentWorkflow.id,
+ "workflowId": workflow.id,
"message": "Service AI",
"type": "info",
"status": status,
"progress": chunkProgress
}
try:
- self.services.workflow.storeLog(self.services.currentWorkflow, logData)
+ self.services.chat.storeLog(workflow, logData)
except Exception as e:
logger.warning(f"Failed to store chunking progress log: {e}")
diff --git a/modules/services/serviceGeneration/mainServiceGeneration.py b/modules/services/serviceGeneration/mainServiceGeneration.py
index 9dddb49d..79730a0a 100644
--- a/modules/services/serviceGeneration/mainServiceGeneration.py
+++ b/modules/services/serviceGeneration/mainServiceGeneration.py
@@ -1,10 +1,7 @@
import logging
import uuid
-import time
-from typing import Any, Dict, List, Optional, Union, Tuple
+from typing import Any, Dict, List, Optional
from modules.datamodels.datamodelChat import ChatDocument
-from modules.datamodels.datamodelAi import AiCallResponse
-from modules.aicore.aicoreModelRegistry import modelRegistry
from modules.services.serviceGeneration.subDocumentUtility import (
getFileExtension,
getMimeTypeFromExtension,
@@ -19,11 +16,10 @@ class GenerationService:
def __init__(self, serviceCenter=None):
# Directly use interfaces from the provided service center (no self.service calls)
self.services = serviceCenter
- self.interfaceDbComponent = getattr(serviceCenter, 'interfaceDbComponent', None) if serviceCenter else None
- self.interfaceDbChat = getattr(serviceCenter, 'interfaceDbChat', None) if serviceCenter else None
- self.workflow = getattr(serviceCenter, 'workflow', None) if serviceCenter else None
+ self.interfaceDbComponent = serviceCenter.interfaceDbComponent
+ self.interfaceDbChat = serviceCenter.interfaceDbChat
- def processActionResultDocuments(self, action_result, action, workflow) -> List[Dict[str, Any]]:
+ def processActionResultDocuments(self, actionResult, action) -> List[Dict[str, Any]]:
"""
Process documents produced by AI actions and convert them to ChatDocument format.
This function handles AI-generated document data, not document references.
@@ -31,7 +27,7 @@ class GenerationService:
"""
try:
# Read documents from the standard documents field (not data.documents)
- documents = action_result.documents if action_result and hasattr(action_result, 'documents') else []
+ documents = actionResult.documents if actionResult and hasattr(actionResult, 'documents') else []
if not documents:
return []
@@ -69,13 +65,13 @@ class GenerationService:
logger.error(f"Error processing single document: {str(e)}")
return None
- def createDocumentsFromActionResult(self, action_result, action, workflow, message_id=None) -> List[Any]:
+ def createDocumentsFromActionResult(self, actionResult, action, workflow, message_id=None) -> List[Any]:
"""
Create actual document objects from action result and store them in the system.
Returns a list of created document objects with proper workflow context.
"""
try:
- processed_docs = self.processActionResultDocuments(action_result, action, workflow)
+ processed_docs = self.processActionResultDocuments(actionResult, action)
createdDocuments = []
for i, doc_data in enumerate(processed_docs):
diff --git a/modules/services/serviceWeb/mainServiceWeb.py b/modules/services/serviceWeb/mainServiceWeb.py
index c5e05412..be7609e8 100644
--- a/modules/services/serviceWeb/mainServiceWeb.py
+++ b/modules/services/serviceWeb/mainServiceWeb.py
@@ -47,7 +47,7 @@ class WebService:
"""
try:
# Step 1: AI intention analysis - extract URLs and parameters from prompt
- self.services.workflow.progressLogUpdate(operationId, 0.1, "Analyzing research intent")
+ self.services.chat.progressLogUpdate(operationId, 0.1, "Analyzing research intent")
analysisResult = await self._analyzeResearchIntent(prompt, urls, country, language, researchDepth)
@@ -72,7 +72,7 @@ class WebService:
# Step 2: Search for URLs if needed (based on needsSearch flag)
if needsSearch and (not allUrls or len(allUrls) < maxNumberPages):
- self.services.workflow.progressLogUpdate(operationId, 0.3, "Searching for URLs")
+ self.services.chat.progressLogUpdate(operationId, 0.3, "Searching for URLs")
searchUrls = await self._performWebSearch(
instruction=instruction,
@@ -84,7 +84,7 @@ class WebService:
# Add search URLs to the list
allUrls.extend(searchUrls)
- self.services.workflow.progressLogUpdate(operationId, 0.5, f"Found {len(allUrls)} total URLs")
+ self.services.chat.progressLogUpdate(operationId, 0.5, f"Found {len(allUrls)} total URLs")
# Step 3: Filter to maxNumberPages (simple cut, no intelligent filtering)
if len(allUrls) > maxNumberPages:
@@ -99,7 +99,7 @@ class WebService:
maxDepth = depthMap.get(finalResearchDepth.lower(), 2)
# Step 5: Crawl all URLs
- self.services.workflow.progressLogUpdate(operationId, 0.6, f"Crawling {len(allUrls)} URLs")
+ self.services.chat.progressLogUpdate(operationId, 0.6, f"Crawling {len(allUrls)} URLs")
crawlResult = await self._performWebCrawl(
instruction=instruction,
@@ -107,7 +107,7 @@ class WebService:
maxDepth=maxDepth
)
- self.services.workflow.progressLogUpdate(operationId, 0.9, "Consolidating results")
+ self.services.chat.progressLogUpdate(operationId, 0.9, "Consolidating results")
# Return consolidated result
result = {
diff --git a/modules/shared/progressLogger.py b/modules/shared/progressLogger.py
index e6ecdee7..4d1c890f 100644
--- a/modules/shared/progressLogger.py
+++ b/modules/shared/progressLogger.py
@@ -13,15 +13,13 @@ logger = logging.getLogger(__name__)
class ProgressLogger:
"""Centralized progress logger for workflow operations."""
- def __init__(self, workflowService, workflow):
+ def __init__(self, services):
"""Initialize progress logger.
Args:
- workflowService: WorkflowService instance for logging
- workflow: Workflow object to get workflowId from
+ services: Services object for accessing chat service and workflow
"""
- self.workflowService = workflowService
- self.workflow = workflow
+ self.services = services
self.activeOperations = {}
self.finishedOperations = set() # Track finished operations to avoid repeated warnings
@@ -115,8 +113,13 @@ class ProgressLogger:
op = self.activeOperations[operationId]
message = f"Service {op['service']}"
+ workflow = self.services.workflow
+ if not workflow:
+ logger.warning(f"Cannot log progress: no workflow available")
+ return
+
logData = {
- "workflowId": self.workflow.id,
+ "workflowId": workflow.id,
"message": message,
"type": "info",
"status": status,
@@ -124,7 +127,7 @@ class ProgressLogger:
}
try:
- self.workflowService.storeLog(self.workflow, logData)
+ self.services.chat.storeLog(workflow, logData)
except Exception as e:
logger.error(f"Failed to store progress log: {e}")
diff --git a/modules/workflows/methods/methodAi.py b/modules/workflows/methods/methodAi.py
index b54e855b..a1206ba6 100644
--- a/modules/workflows/methods/methodAi.py
+++ b/modules/workflows/methods/methodAi.py
@@ -43,11 +43,11 @@ class MethodAi(MethodBase):
"""
try:
# Init progress logger
- workflowId = self.services.currentWorkflow.id if self.services.currentWorkflow else f"no-workflow-{int(time.time())}"
+ workflowId = self.services.workflow.id if self.services.workflow else f"no-workflow-{int(time.time())}"
operationId = f"ai_process_{workflowId}_{int(time.time())}"
# Start progress tracking
- self.services.workflow.progressLogStart(
+ self.services.chat.progressLogStart(
operationId,
"Generate",
"AI Processing",
@@ -58,7 +58,7 @@ class MethodAi(MethodBase):
logger.info(f"aiPrompt extracted: '{aiPrompt}' (type: {type(aiPrompt)})")
# Update progress - preparing parameters
- self.services.workflow.progressLogUpdate(operationId, 0.2, "Preparing parameters")
+ self.services.chat.progressLogUpdate(operationId, 0.2, "Preparing parameters")
documentList = parameters.get("documentList", [])
if isinstance(documentList, str):
@@ -79,17 +79,17 @@ class MethodAi(MethodBase):
logger.info(f"Using result type: {resultType} -> {output_extension}")
# Update progress - preparing documents
- self.services.workflow.progressLogUpdate(operationId, 0.3, "Preparing documents")
+ self.services.chat.progressLogUpdate(operationId, 0.3, "Preparing documents")
# Get ChatDocuments for AI service - let AI service handle all document processing
chatDocuments = []
if documentList:
- chatDocuments = self.services.workflow.getChatDocumentsFromDocumentList(documentList)
+ chatDocuments = self.services.chat.getChatDocumentsFromDocumentList(documentList)
if chatDocuments:
logger.info(f"Prepared {len(chatDocuments)} documents for AI processing")
# Update progress - preparing AI call
- self.services.workflow.progressLogUpdate(operationId, 0.4, "Preparing AI call")
+ self.services.chat.progressLogUpdate(operationId, 0.4, "Preparing AI call")
# Build options with only resultFormat - let service layer handle all other parameters
output_format = output_extension.replace('.', '') or 'txt'
@@ -99,7 +99,7 @@ class MethodAi(MethodBase):
)
# Update progress - calling AI
- self.services.workflow.progressLogUpdate(operationId, 0.6, "Calling AI")
+ self.services.chat.progressLogUpdate(operationId, 0.6, "Calling AI")
result = await self.services.ai.callAiDocuments(
prompt=aiPrompt,
@@ -109,7 +109,7 @@ class MethodAi(MethodBase):
)
# Update progress - processing result
- self.services.workflow.progressLogUpdate(operationId, 0.8, "Processing result")
+ self.services.chat.progressLogUpdate(operationId, 0.8, "Processing result")
from modules.datamodels.datamodelChat import ActionDocument
@@ -147,7 +147,7 @@ class MethodAi(MethodBase):
final_documents = [action_document]
# Complete progress tracking
- self.services.workflow.progressLogFinish(operationId, True)
+ self.services.chat.progressLogFinish(operationId, True)
return ActionResult.isSuccess(documents=final_documents)
@@ -156,7 +156,7 @@ class MethodAi(MethodBase):
# Complete progress tracking with failure
try:
- self.services.workflow.progressLogFinish(operationId, False)
+ self.services.chat.progressLogFinish(operationId, False)
except:
pass # Don't fail on progress logging errors
@@ -186,10 +186,10 @@ class MethodAi(MethodBase):
return ActionResult.isFailure(error="Research prompt is required")
# Init progress logger
- operationId = f"web_research_{self.services.currentWorkflow.id}_{int(time.time())}"
+ operationId = f"web_research_{self.services.workflow.id}_{int(time.time())}"
# Start progress tracking
- self.services.workflow.progressLogStart(
+ self.services.chat.progressLogStart(
operationId,
"Web Research",
"Searching and Crawling",
@@ -207,7 +207,7 @@ class MethodAi(MethodBase):
)
# Complete progress tracking
- self.services.workflow.progressLogFinish(operationId, True)
+ self.services.chat.progressLogFinish(operationId, True)
# Get meaningful filename from research result (generated by intent analyzer)
suggestedFilename = result.get("suggested_filename")
@@ -249,7 +249,7 @@ class MethodAi(MethodBase):
except Exception as e:
logger.error(f"Error in web research: {str(e)}")
try:
- self.services.workflow.progressLogFinish(operationId, False)
+ self.services.chat.progressLogFinish(operationId, False)
except:
pass
return ActionResult.isFailure(error=str(e))
diff --git a/modules/workflows/methods/methodBase.py b/modules/workflows/methods/methodBase.py
index 2850f370..3d6742aa 100644
--- a/modules/workflows/methods/methodBase.py
+++ b/modules/workflows/methods/methodBase.py
@@ -187,7 +187,7 @@ class MethodBase:
try:
# Get workflow context from services if not provided
if workflow_context is None and hasattr(self.services, 'workflow'):
- workflow_context = self.services.workflow.getWorkflowContext()
+ workflow_context = self.services.chat.getWorkflowContext()
# Extract round, task, action numbers
round_num = workflow_context.get('currentRound', 0) if workflow_context else 0
diff --git a/modules/workflows/methods/methodOutlook.py b/modules/workflows/methods/methodOutlook.py
index c7e2eb35..8e769b99 100644
--- a/modules/workflows/methods/methodOutlook.py
+++ b/modules/workflows/methods/methodOutlook.py
@@ -36,7 +36,7 @@ class MethodOutlook(MethodBase):
logger.debug(f"Getting Microsoft connection for reference: {connectionReference}")
# Get the connection from the service
- userConnection = self.services.workflow.getUserConnectionFromConnectionReference(connectionReference)
+ userConnection = self.services.chat.getUserConnectionFromConnectionReference(connectionReference)
if not userConnection:
logger.error(f"Connection not found: {connectionReference}")
return None
@@ -44,7 +44,7 @@ class MethodOutlook(MethodBase):
logger.debug(f"Found connection: {userConnection.id}, status: {userConnection.status.value}, authority: {userConnection.authority.value}")
# Get a fresh token for this connection
- token = self.services.workflow.getFreshConnectionToken(userConnection.id)
+ token = self.services.chat.getFreshConnectionToken(userConnection.id)
if not token:
logger.error(f"Fresh token not found for connection: {userConnection.id}")
logger.debug(f"Connection details: {userConnection}")
@@ -1136,7 +1136,7 @@ class MethodOutlook(MethodBase):
# Prepare documents for AI processing
chatDocuments = []
if documentList:
- chatDocuments = self.services.workflow.getChatDocumentsFromDocumentList(documentList)
+ chatDocuments = self.services.chat.getChatDocumentsFromDocumentList(documentList)
# Create AI prompt for email composition
# Build document reference list for AI with expanded list contents when possible
@@ -1144,13 +1144,12 @@ class MethodOutlook(MethodBase):
doc_list_text = ""
if doc_references:
lines = ["Available_Document_References:"]
- workflow_obj = getattr(self.services, 'currentWorkflow', None)
for ref in doc_references:
# Each item is a label: resolve to its document list and render contained items
- list_docs = self.services.workflow.getChatDocumentsFromDocumentList([ref]) or []
+ list_docs = self.services.chat.getChatDocumentsFromDocumentList([ref]) or []
if list_docs:
for d in list_docs:
- doc_ref_label = self.services.workflow.getDocumentReferenceFromChatDocument(d)
+ doc_ref_label = self.services.chat.getDocumentReferenceFromChatDocument(d)
lines.append(f"- {doc_ref_label}")
else:
lines.append(" - (no documents)")
@@ -1216,7 +1215,7 @@ Return JSON:
if documentList:
try:
available_refs = [documentList] if isinstance(documentList, str) else documentList
- available_docs = self.services.workflow.getChatDocumentsFromDocumentList(available_refs) or []
+ available_docs = self.services.chat.getChatDocumentsFromDocumentList(available_refs) or []
except Exception:
available_docs = []
@@ -1229,7 +1228,7 @@ Return JSON:
if ai_attachments:
try:
ai_refs = [ai_attachments] if isinstance(ai_attachments, str) else ai_attachments
- ai_docs = self.services.workflow.getChatDocumentsFromDocumentList(ai_refs) or []
+ ai_docs = self.services.chat.getChatDocumentsFromDocumentList(ai_refs) or []
except Exception:
ai_docs = []
@@ -1239,15 +1238,15 @@ Return JSON:
if selected_docs:
# Map selected ChatDocuments back to docItem references
- documentList = [self.services.workflow.getDocumentReferenceFromChatDocument(d) for d in selected_docs]
+ documentList = [self.services.chat.getDocumentReferenceFromChatDocument(d) for d in selected_docs]
logger.info(f"AI selected {len(documentList)} documents for attachment (resolved via ChatDocuments)")
else:
# No intersection; use all available documents
- documentList = [self.services.workflow.getDocumentReferenceFromChatDocument(d) for d in available_docs]
+ documentList = [self.services.chat.getDocumentReferenceFromChatDocument(d) for d in available_docs]
logger.warning("AI selected attachments not found in available documents, using all documents")
else:
# No AI selection; use all available documents
- documentList = [self.services.workflow.getDocumentReferenceFromChatDocument(d) for d in available_docs]
+ documentList = [self.services.chat.getDocumentReferenceFromChatDocument(d) for d in available_docs]
logger.warning("AI did not specify attachments, using all available documents")
else:
logger.info("No documents provided in documentList; skipping attachment processing")
@@ -1297,13 +1296,13 @@ Return JSON:
message["attachments"] = []
for attachment_ref in documentList:
# Get attachment document from service center
- attachment_docs = self.services.workflow.getChatDocumentsFromDocumentList([attachment_ref])
+ attachment_docs = self.services.chat.getChatDocumentsFromDocumentList([attachment_ref])
if attachment_docs:
for doc in attachment_docs:
file_id = getattr(doc, 'fileId', None)
if file_id:
try:
- file_content = self.services.workflow.getFileData(file_id)
+ file_content = self.services.chat.getFileData(file_id)
if file_content:
if isinstance(file_content, bytes):
content_bytes = file_content
diff --git a/modules/workflows/methods/methodSharepoint.py b/modules/workflows/methods/methodSharepoint.py
index 11bca67a..dea73974 100644
--- a/modules/workflows/methods/methodSharepoint.py
+++ b/modules/workflows/methods/methodSharepoint.py
@@ -31,7 +31,7 @@ class MethodSharepoint(MethodBase):
def _getMicrosoftConnection(self, connectionReference: str) -> Optional[Dict[str, Any]]:
"""Get Microsoft connection from connection reference and configure SharePoint service"""
try:
- userConnection = self.services.workflow.getUserConnectionFromConnectionReference(connectionReference)
+ userConnection = self.services.chat.getUserConnectionFromConnectionReference(connectionReference)
if not userConnection:
logger.warning(f"No user connection found for reference: {connectionReference}")
return None
@@ -847,13 +847,13 @@ class MethodSharepoint(MethodBase):
try:
import json
# Resolve the reference label to get the actual document list
- document_list = self.services.workflow.getChatDocumentsFromDocumentList([pathObject])
+ document_list = self.services.chat.getChatDocumentsFromDocumentList([pathObject])
if not document_list or len(document_list) == 0:
return ActionResult.isFailure(error=f"No document list found for reference: {pathObject}")
# Get the first document's content (which should be the JSON)
first_document = document_list[0]
- file_data = self.services.workflow.getFileData(first_document.fileId)
+ file_data = self.services.chat.getFileData(first_document.fileId)
if not file_data:
return ActionResult.isFailure(error=f"No file data found for document: {pathObject}")
@@ -881,7 +881,7 @@ class MethodSharepoint(MethodBase):
# Get documents from reference - ensure documentList is a list, not a string
# documentList is already normalized above
- chatDocuments = self.services.workflow.getChatDocumentsFromDocumentList(documentList)
+ chatDocuments = self.services.chat.getChatDocumentsFromDocumentList(documentList)
if not chatDocuments:
return ActionResult.isFailure(error="No documents found for the provided reference")
@@ -1127,13 +1127,13 @@ class MethodSharepoint(MethodBase):
try:
import json
# Resolve the reference label to get the actual document list
- document_list = self.services.workflow.getChatDocumentsFromDocumentList([pathObject])
+ document_list = self.services.chat.getChatDocumentsFromDocumentList([pathObject])
if not document_list or len(document_list) == 0:
return ActionResult.isFailure(error=f"No document list found for reference: {pathObject}")
# Get the first document's content (which should be the JSON)
first_document = document_list[0]
- file_data = self.services.workflow.getFileData(first_document.fileId)
+ file_data = self.services.chat.getFileData(first_document.fileId)
if not file_data:
return ActionResult.isFailure(error=f"No file data found for document: {pathObject}")
@@ -1228,7 +1228,7 @@ class MethodSharepoint(MethodBase):
# Get documents from reference - ensure documentList is a list, not a string
if isinstance(documentList, str):
documentList = [documentList] # Convert string to list
- chatDocuments = self.services.workflow.getChatDocumentsFromDocumentList(documentList)
+ chatDocuments = self.services.chat.getChatDocumentsFromDocumentList(documentList)
if not chatDocuments:
return ActionResult.isFailure(error="No documents found for the provided reference")
@@ -1318,7 +1318,7 @@ class MethodSharepoint(MethodBase):
for i, (chatDocument, fileName) in enumerate(zip(chatDocuments, fileNames)):
try:
fileId = chatDocument.fileId
- file_data = self.services.workflow.getFileData(fileId)
+ file_data = self.services.chat.getFileData(fileId)
if not file_data:
logger.warning(f"File data not found for fileId: {fileId}")
@@ -1481,14 +1481,14 @@ class MethodSharepoint(MethodBase):
try:
import json
# Resolve the reference label to get the actual document list
- document_list = self.services.workflow.getChatDocumentsFromDocumentList([pathObject])
+ document_list = self.services.chat.getChatDocumentsFromDocumentList([pathObject])
if not document_list or len(document_list) == 0:
return ActionResult.isFailure(error=f"No document list found for reference: {pathObject}")
# Get the first document's content (which should be the JSON)
first_document = document_list[0]
logger.info(f"Document fileId: {first_document.fileId}, fileName: {first_document.fileName}")
- file_data = self.services.workflow.getFileData(first_document.fileId)
+ file_data = self.services.chat.getFileData(first_document.fileId)
if not file_data:
return ActionResult.isFailure(error=f"No file data found for document: {pathObject} (fileId: {first_document.fileId})")
logger.info(f"File data length: {len(file_data) if file_data else 0}")
diff --git a/modules/workflows/processing/core/actionExecutor.py b/modules/workflows/processing/core/actionExecutor.py
index 3dbe0d35..d1b523ea 100644
--- a/modules/workflows/processing/core/actionExecutor.py
+++ b/modules/workflows/processing/core/actionExecutor.py
@@ -6,6 +6,7 @@ from typing import Dict, Any, List
from modules.datamodels.datamodelChat import ActionResult, ActionItem, TaskStep
from modules.datamodels.datamodelChat import ChatWorkflow
from modules.workflows.processing.shared.methodDiscovery import methods
+from modules.workflows.processing.shared.stateTools import checkWorkflowStopped
logger = logging.getLogger(__name__)
@@ -15,20 +16,6 @@ class ActionExecutor:
def __init__(self, services):
self.services = services
- def _checkWorkflowStopped(self, workflow):
- """Check if workflow has been stopped by user and raise exception if so"""
- try:
- # Get the current workflow status from the database to avoid stale data
- current_workflow = self.services.interfaceDbChat.getWorkflow(workflow.id)
- if current_workflow and current_workflow.status == "stopped":
- logger.info("Workflow stopped by user, aborting action execution")
- raise Exception("Workflow was stopped by user")
- except Exception as e:
- # If we can't get the current status due to other database issues, fall back to the in-memory object
- logger.warning(f"Could not check current workflow status from database: {str(e)}")
- if workflow and workflow.status == "stopped":
- logger.info("Workflow stopped by user (from in-memory object), aborting action execution")
- raise Exception("Workflow was stopped by user")
async def executeAction(self, methodName: str, actionName: str, parameters: Dict[str, Any]) -> ActionResult:
"""Execute a method action"""
@@ -70,7 +57,7 @@ class ActionExecutor:
"""Execute a single action and return ActionResult with enhanced document processing"""
try:
# Check workflow status before executing action
- self._checkWorkflowStopped(workflow)
+ checkWorkflowStopped(self.services)
# Use passed indices or fallback to '?'
taskNum = taskIndex if taskIndex is not None else '?'
@@ -94,7 +81,7 @@ class ActionExecutor:
logger.info(f"Expected formats: {action.expectedDocumentFormats}")
# Check workflow status before executing the action
- self._checkWorkflowStopped(workflow)
+ checkWorkflowStopped(self.services)
result = await self.executeAction(
methodName=action.execMethod,
@@ -156,7 +143,7 @@ class ActionExecutor:
logger.error(f"Action failed: {result.error}")
# Create database log entry for action failure (write-through + bind)
- self.services.workflow.storeLog(workflow, {
+ self.services.chat.storeLog(workflow, {
"message": f"ā **Task {taskNum}**ā **Action {actionNum}/{totalActions}** failed: {result.error}",
"type": "error",
"progress": 100
diff --git a/modules/workflows/processing/core/messageCreator.py b/modules/workflows/processing/core/messageCreator.py
index e9489a79..ea0699ed 100644
--- a/modules/workflows/processing/core/messageCreator.py
+++ b/modules/workflows/processing/core/messageCreator.py
@@ -5,6 +5,7 @@ import logging
from typing import Dict, Any, Optional, List
from modules.datamodels.datamodelChat import TaskPlan, TaskStep, ActionResult, ReviewResult
from modules.datamodels.datamodelChat import ChatWorkflow
+from modules.workflows.processing.shared.stateTools import checkWorkflowStopped
logger = logging.getLogger(__name__)
@@ -14,26 +15,12 @@ class MessageCreator:
def __init__(self, services):
self.services = services
- def _checkWorkflowStopped(self, workflow):
- """Check if workflow has been stopped by user and raise exception if so"""
- try:
- # Get the current workflow status from the database to avoid stale data
- current_workflow = self.services.interfaceDbChat.getWorkflow(workflow.id)
- if current_workflow and current_workflow.status == "stopped":
- logger.info("Workflow stopped by user, aborting message creation")
- raise Exception("Workflow was stopped by user")
- except Exception as e:
- # If we can't get the current status due to other database issues, fall back to the in-memory object
- logger.warning(f"Could not check current workflow status from database: {str(e)}")
- if workflow and workflow.status == "stopped":
- logger.info("Workflow stopped by user (from in-memory object), aborting message creation")
- raise Exception("Workflow was stopped by user")
async def createTaskPlanMessage(self, taskPlan: TaskPlan, workflow: ChatWorkflow):
"""Create a chat message containing the task plan with user-friendly messages"""
try:
# Check workflow status before creating message
- self._checkWorkflowStopped(workflow)
+ checkWorkflowStopped(self.services)
# Build task plan summary
taskSummary = f"š **Task Plan**\n\n"
@@ -67,7 +54,7 @@ class MessageCreator:
"taskProgress": "pending"
}
- self.services.workflow.storeMessageWithDocuments(workflow, messageData, [])
+ self.services.chat.storeMessageWithDocuments(workflow, messageData, [])
logger.info("Task plan message created successfully")
except Exception as e:
logger.error(f"Error creating task plan message: {str(e)}")
@@ -76,7 +63,7 @@ class MessageCreator:
"""Create a task start message for the user"""
try:
# Check workflow status before creating message
- self._checkWorkflowStopped(workflow)
+ checkWorkflowStopped(self.services)
# Create a task start message for the user
taskProgress = f"{taskIndex}/{totalTasks}" if totalTasks is not None else str(taskIndex)
@@ -101,7 +88,7 @@ class MessageCreator:
if taskStep.userMessage:
taskStartMessage["message"] += f"\n\nš¬ {taskStep.userMessage}"
- self.services.workflow.storeMessageWithDocuments(workflow, taskStartMessage, [])
+ self.services.chat.storeMessageWithDocuments(workflow, taskStartMessage, [])
logger.info(f"Task start message created for task {taskIndex}")
except Exception as e:
logger.error(f"Error creating task start message: {str(e)}")
@@ -112,7 +99,7 @@ class MessageCreator:
"""Create and store a message for the action result in the workflow with enhanced document processing"""
try:
# Check workflow status before creating action message
- self._checkWorkflowStopped(workflow)
+ checkWorkflowStopped(self.services)
if resultLabel is None:
resultLabel = action.execResultLabel
@@ -124,8 +111,8 @@ class MessageCreator:
logger.info(f"Result label: {resultLabel} - No documents")
# Get current workflow context and stats
- workflowContext = self.services.workflow.getWorkflowContext()
- workflowStats = self.services.workflow.getWorkflowStats()
+ workflowContext = self.services.chat.getWorkflowContext()
+ workflowStats = self.services.chat.getWorkflowStats()
# Create a more meaningful message that includes task context
taskObjective = taskStep.objective if taskStep else 'Unknown task'
@@ -191,7 +178,7 @@ class MessageCreator:
logger.info(f"Creating ERROR message: {messageText}")
logger.info(f"Message data: {messageData}")
- self.services.workflow.storeMessageWithDocuments(workflow, messageData, createdDocuments)
+ self.services.chat.storeMessageWithDocuments(workflow, messageData, createdDocuments)
logger.info(f"Message created: {action.execMethod}.{action.execAction}")
except Exception as e:
logger.error(f"Error creating action message: {str(e)}")
@@ -201,7 +188,7 @@ class MessageCreator:
"""Create a task completion message for the user"""
try:
# Check workflow status before creating message
- self._checkWorkflowStopped(workflow)
+ checkWorkflowStopped(self.services)
# Create a task completion message for the user
taskProgress = str(taskIndex)
@@ -234,7 +221,7 @@ class MessageCreator:
"taskProgress": "success"
}
- self.services.workflow.storeMessageWithDocuments(workflow, taskCompletionMessage, [])
+ self.services.chat.storeMessageWithDocuments(workflow, taskCompletionMessage, [])
logger.info(f"Task completion message created for task {taskIndex}")
except Exception as e:
logger.error(f"Error creating task completion message: {str(e)}")
@@ -243,7 +230,7 @@ class MessageCreator:
"""Create a retry message for the user"""
try:
# Check workflow status before creating message
- self._checkWorkflowStopped(workflow)
+ checkWorkflowStopped(self.services)
# Create retry message for user
retryMessage = {
@@ -261,7 +248,7 @@ class MessageCreator:
"taskProgress": "retry"
}
- self.services.workflow.storeMessageWithDocuments(workflow, retryMessage, [])
+ self.services.chat.storeMessageWithDocuments(workflow, retryMessage, [])
logger.info(f"Retry message created for task {taskIndex}")
except Exception as e:
logger.error(f"Error creating retry message: {str(e)}")
@@ -270,7 +257,7 @@ class MessageCreator:
"""Create an error message for the user"""
try:
# Check workflow status before creating message
- self._checkWorkflowStopped(workflow)
+ checkWorkflowStopped(self.services)
# Create user-facing error message for task failure
errorMessage = f"**Task {taskIndex}**\n\nā '{taskStep.objective}' failed\n\n"
@@ -300,7 +287,7 @@ class MessageCreator:
"taskProgress": "fail"
}
- self.services.workflow.storeMessageWithDocuments(workflow, messageData, [])
+ self.services.chat.storeMessageWithDocuments(workflow, messageData, [])
logger.info(f"Error message created for task {taskIndex}")
except Exception as e:
logger.error(f"Error creating error message: {str(e)}")
diff --git a/modules/workflows/processing/core/taskPlanner.py b/modules/workflows/processing/core/taskPlanner.py
index 6e06aa56..6a73d971 100644
--- a/modules/workflows/processing/core/taskPlanner.py
+++ b/modules/workflows/processing/core/taskPlanner.py
@@ -10,6 +10,7 @@ from modules.workflows.processing.shared.promptGenerationTaskplan import (
generateTaskPlanningPrompt
)
from modules.workflows.processing.adaptive import IntentAnalyzer
+from modules.workflows.processing.shared.stateTools import checkWorkflowStopped
logger = logging.getLogger(__name__)
@@ -19,26 +20,12 @@ class TaskPlanner:
def __init__(self, services):
self.services = services
- def _checkWorkflowStopped(self, workflow):
- """Check if workflow has been stopped by user and raise exception if so"""
- try:
- # Get the current workflow status from the database to avoid stale data
- current_workflow = self.services.interfaceDbChat.getWorkflow(workflow.id)
- if current_workflow and current_workflow.status == "stopped":
- logger.info("Workflow stopped by user, aborting task planning")
- raise Exception("Workflow was stopped by user")
- except Exception as e:
- # If we can't get the current status due to other database issues, fall back to the in-memory object
- logger.warning(f"Could not check current workflow status from database: {str(e)}")
- if workflow and workflow.status == "stopped":
- logger.info("Workflow stopped by user (from in-memory object), aborting task planning")
- raise Exception("Workflow was stopped by user")
async def generateTaskPlan(self, userInput: str, workflow) -> TaskPlan:
"""Generate a high-level task plan for the workflow"""
try:
# Check workflow status before generating task plan
- self._checkWorkflowStopped(workflow)
+ checkWorkflowStopped(self.services)
logger.info(f"=== STARTING TASK PLAN GENERATION ===")
logger.info(f"Workflow ID: {workflow.id}")
@@ -49,7 +36,7 @@ class TaskPlanner:
logger.info(f"Actual User Prompt: {actualUserPrompt}")
# Check workflow status before calling AI service
- self._checkWorkflowStopped(workflow)
+ checkWorkflowStopped(self.services)
# Analyze user intent to obtain cleaned user objective for planning
# This intent will be reused for workflow-level validation in executeTask
diff --git a/modules/workflows/processing/modes/modeActionplan.py b/modules/workflows/processing/modes/modeActionplan.py
index 0a50926e..facad71b 100644
--- a/modules/workflows/processing/modes/modeActionplan.py
+++ b/modules/workflows/processing/modes/modeActionplan.py
@@ -13,6 +13,7 @@ from modules.datamodels.datamodelChat import (
from modules.datamodels.datamodelChat import ChatWorkflow
from modules.datamodels.datamodelAi import AiCallOptions, OperationTypeEnum, ProcessingModeEnum, PriorityEnum
from modules.workflows.processing.modes.modeBase import BaseMode
+from modules.workflows.processing.shared.stateTools import checkWorkflowStopped
from modules.workflows.processing.shared.executionState import TaskExecutionState
from modules.workflows.processing.shared.promptGenerationActionsActionplan import (
generateActionDefinitionPrompt,
@@ -26,8 +27,8 @@ logger = logging.getLogger(__name__)
class ActionplanMode(BaseMode):
"""Actionplan mode implementation - batch planning and sequential execution"""
- def __init__(self, services, workflow):
- super().__init__(services, workflow)
+ def __init__(self, services):
+ super().__init__(services)
# Initialize adaptive components for enhanced validation and learning
self.intentAnalyzer = IntentAnalyzer(services)
self.learningEngine = LearningEngine()
@@ -42,7 +43,7 @@ class ActionplanMode(BaseMode):
"""Generate actions for a given task step using batch planning approach"""
try:
# Check workflow status before generating actions
- self._checkWorkflowStopped(workflow)
+ checkWorkflowStopped(self.services)
retryInfo = f" (Retry #{enhancedContext.retryCount})" if enhancedContext and enhancedContext.retryCount > 0 else ""
logger.info(f"Generating actions for task: {taskStep.objective}{retryInfo}")
@@ -126,7 +127,7 @@ class ActionplanMode(BaseMode):
)
# Check workflow status before calling AI service
- self._checkWorkflowStopped(workflow)
+ checkWorkflowStopped(self.services)
# Build prompt bundle (template + placeholders)
bundle = generateActionDefinitionPrompt(self.services, actionContext)
@@ -262,7 +263,7 @@ class ActionplanMode(BaseMode):
# Update workflow context for this task
if taskIndex is not None:
- self.services.workflow.setWorkflowContext(taskNumber=taskIndex)
+ self.services.chat.setWorkflowContext(taskNumber=taskIndex)
# Create task start message
await self.messageCreator.createTaskStartMessage(taskStep, workflow, taskIndex, totalTasks)
@@ -275,7 +276,7 @@ class ActionplanMode(BaseMode):
logger.info(f"Task execution attempt {attempt+1}/{maxRetries}")
# Check workflow status before starting task execution
- self._checkWorkflowStopped(workflow)
+ checkWorkflowStopped(self.services)
# Update retry context with current attempt information
if retryContext:
@@ -300,7 +301,7 @@ class ActionplanMode(BaseMode):
actionResults = []
for actionIdx, action in enumerate(actions):
# Check workflow status before each action execution
- self._checkWorkflowStopped(workflow)
+ checkWorkflowStopped(self.services)
# Update workflow object before executing action
actionNumber = actionIdx + 1
@@ -330,7 +331,7 @@ class ActionplanMode(BaseMode):
if action.userMessage:
actionStartMessage["message"] += f"\n\nš¬ {action.userMessage}"
- self.services.workflow.storeMessageWithDocuments(workflow, actionStartMessage, [])
+ self.services.chat.storeMessageWithDocuments(workflow, actionStartMessage, [])
logger.info(f"Action start message created for action {actionNumber}")
# Execute single action
@@ -376,7 +377,7 @@ class ActionplanMode(BaseMode):
state.addFailedAction(result)
# Check workflow status before review
- self._checkWorkflowStopped(workflow)
+ checkWorkflowStopped(self.services)
reviewResult = await self._reviewTaskCompletion(taskStep, actions, actionResults, workflow)
success = reviewResult.status == 'success'
@@ -479,7 +480,7 @@ class ActionplanMode(BaseMode):
"""Review task completion and determine success/failure/retry"""
try:
# Check workflow status before reviewing task completion
- self._checkWorkflowStopped(workflow)
+ checkWorkflowStopped(self.services)
logger.info(f"=== STARTING TASK COMPLETION REVIEW ===")
logger.info(f"Task: {taskStep.objective}")
@@ -510,7 +511,7 @@ class ActionplanMode(BaseMode):
)
# Check workflow status before calling AI service
- self._checkWorkflowStopped(workflow)
+ checkWorkflowStopped(self.services)
# Build prompt bundle for result review
bundle = generateResultReviewPrompt(reviewContext)
diff --git a/modules/workflows/processing/modes/modeAutomation.py b/modules/workflows/processing/modes/modeAutomation.py
index 7e1c9ca2..7c93b43b 100644
--- a/modules/workflows/processing/modes/modeAutomation.py
+++ b/modules/workflows/processing/modes/modeAutomation.py
@@ -11,14 +11,15 @@ from modules.datamodels.datamodelChat import (
)
from modules.datamodels.datamodelChat import ChatWorkflow
from modules.workflows.processing.modes.modeBase import BaseMode
+from modules.workflows.processing.shared.stateTools import checkWorkflowStopped
logger = logging.getLogger(__name__)
class AutomationMode(BaseMode):
"""Automation mode implementation - executes workflows from predefined plans"""
- def __init__(self, services, workflow):
- super().__init__(services, workflow)
+ def __init__(self, services):
+ super().__init__(services)
# Store action lists for each task (mapped by task ID)
self.taskActionMap: Dict[str, List[Dict[str, Any]]] = {}
logger.info("AutomationMode initialized - will use predefined plan from workflow")
@@ -192,12 +193,12 @@ class AutomationMode(BaseMode):
try:
# Check workflow status
- self._checkWorkflowStopped(workflow)
+ checkWorkflowStopped(self.services)
# Update workflow before executing task
if taskIndex is not None:
self._updateWorkflowBeforeExecutingTask(taskIndex)
- self.services.workflow.setWorkflowContext(taskNumber=taskIndex)
+ self.services.chat.setWorkflowContext(taskNumber=taskIndex)
# Create task start message
await self.messageCreator.createTaskStartMessage(taskStep, workflow, taskIndex, totalTasks)
@@ -228,7 +229,7 @@ class AutomationMode(BaseMode):
actionResults = []
for actionIdx, action in enumerate(actions):
# Check workflow status before each action
- self._checkWorkflowStopped(workflow)
+ checkWorkflowStopped(self.services)
# Update workflow before executing action
actionNumber = actionIdx + 1
@@ -256,7 +257,7 @@ class AutomationMode(BaseMode):
if action.userMessage:
actionStartMessage["message"] += f"\n\nš¬ {action.userMessage}"
- self.services.workflow.storeMessageWithDocuments(workflow, actionStartMessage, [])
+ self.services.chat.storeMessageWithDocuments(workflow, actionStartMessage, [])
# Execute action
result = await self.actionExecutor.executeSingleAction(
diff --git a/modules/workflows/processing/modes/modeBase.py b/modules/workflows/processing/modes/modeBase.py
index 297052ed..b1e3d062 100644
--- a/modules/workflows/processing/modes/modeBase.py
+++ b/modules/workflows/processing/modes/modeBase.py
@@ -16,31 +16,13 @@ logger = logging.getLogger(__name__)
class BaseMode(ABC):
"""Abstract base class for workflow execution modes"""
- def __init__(self, services, workflow):
+ def __init__(self, services):
self.services = services
- self.workflow = workflow
self.taskPlanner = TaskPlanner(services)
self.actionExecutor = ActionExecutor(services)
self.messageCreator = MessageCreator(services)
self.validator = WorkflowValidator(services)
- def _checkWorkflowStopped(self, workflow):
- """Check if workflow has been stopped by user and raise exception if so"""
- try:
- # Get the current workflow status from the database to avoid stale data
- current_workflow = self.services.interfaceDbChat.getWorkflow(workflow.id)
- if current_workflow and current_workflow.status == "stopped":
- logger.info("Workflow stopped by user, aborting execution")
- raise Exception("Workflow was stopped by user")
- except Exception as e:
- # If this was the explicit stop signal, re-raise to abort immediately
- if str(e) == "Workflow was stopped by user":
- raise
- # If we can't get the current status due to other database issues, fall back to the in-memory object
- logger.warning(f"Could not check current workflow status from database: {str(e)}")
- if workflow and workflow.status == "stopped":
- logger.info("Workflow stopped by user (from in-memory object), aborting execution")
- raise Exception("Workflow was stopped by user")
@abstractmethod
async def executeTask(self, taskStep: TaskStep, workflow: ChatWorkflow, context: TaskContext,
diff --git a/modules/workflows/processing/modes/modeDynamic.py b/modules/workflows/processing/modes/modeDynamic.py
index 94e34bec..b7e584da 100644
--- a/modules/workflows/processing/modes/modeDynamic.py
+++ b/modules/workflows/processing/modes/modeDynamic.py
@@ -13,6 +13,7 @@ from modules.datamodels.datamodelChat import (
)
from modules.datamodels.datamodelChat import ChatWorkflow
from modules.workflows.processing.modes.modeBase import BaseMode
+from modules.workflows.processing.shared.stateTools import checkWorkflowStopped
from modules.workflows.processing.shared.executionState import TaskExecutionState, shouldContinue
from modules.workflows.processing.shared.promptGenerationActionsDynamic import (
generateDynamicPlanSelectionPrompt,
@@ -28,8 +29,8 @@ logger = logging.getLogger(__name__)
class DynamicMode(BaseMode):
"""Dynamic mode implementation - iterative plan-act-observe-refine loop"""
- def __init__(self, services, workflow):
- super().__init__(services, workflow)
+ def __init__(self, services):
+ super().__init__(services)
# Initialize adaptive components
self.intentAnalyzer = IntentAnalyzer(services)
self.learningEngine = LearningEngine()
@@ -87,7 +88,7 @@ class DynamicMode(BaseMode):
decision = None
while step <= state.max_steps:
- self._checkWorkflowStopped(workflow)
+ checkWorkflowStopped(self.services)
# Update workflow[currentAction] for UI
self._updateWorkflowBeforeExecutingAction(step)
@@ -250,7 +251,7 @@ class DynamicMode(BaseMode):
# Get available documents from the current workflow
try:
- available_docs = self.services.workflow.getAvailableDocuments(self.services.currentWorkflow)
+ available_docs = self.services.chat.getAvailableDocuments(self.services.workflow)
if not available_docs or available_docs == "No documents available":
logger.warning("No documents available for validation")
return
@@ -759,7 +760,7 @@ class DynamicMode(BaseMode):
"actionProgress": actionProgress
}
- self.services.workflow.storeMessageWithDocuments(workflow, messageData, [])
+ self.services.chat.storeMessageWithDocuments(workflow, messageData, [])
except Exception as e:
logger.error(f"Error creating Dynamic action message: {str(e)}")
diff --git a/modules/workflows/processing/shared/placeholderFactory.py b/modules/workflows/processing/shared/placeholderFactory.py
index 7b838fc7..ed01ad4f 100644
--- a/modules/workflows/processing/shared/placeholderFactory.py
+++ b/modules/workflows/processing/shared/placeholderFactory.py
@@ -117,12 +117,12 @@ def extractUserPrompt(context: Any) -> str:
return context.taskStep.objective or 'No request specified'
return 'No request specified'
-def extractWorkflowHistory(service: Any, context: Any) -> str:
- """Extract workflow history from context. Maps to {{KEY:WORKFLOW_HISTORY}}
+def extractWorkflowHistory(service: Any) -> str:
+ """Extract workflow history. Maps to {{KEY:WORKFLOW_HISTORY}}
Reverse-chronological, enriched with message summaries and document labels.
"""
try:
- history = getPreviousRoundContext(service, service.currentWorkflow)
+ history = getPreviousRoundContext(service)
return history or "No previous workflow rounds available"
except Exception as e:
logger.error(f"Error getting workflow history: {str(e)}")
@@ -229,13 +229,14 @@ def getMessageSummary(msg) -> str:
except Exception:
return ""
-def getPreviousRoundContext(services, workflow: Any) -> str:
+def getPreviousRoundContext(services) -> str:
"""Get enriched context:
- Reverse-chronological ordering
- Current round first (newest ā oldest), then older rounds
- Only messages with documents summarized
- Include available documents snapshot at end
"""
+ workflow = services.workflow
try:
if not workflow:
return "No previous round context available"
@@ -268,7 +269,7 @@ def getPreviousRoundContext(services, workflow: Any) -> str:
# Include available documents snapshot at end
try:
if hasattr(services, 'workflow'):
- docs_index = services.workflow.getAvailableDocuments(workflow)
+ docs_index = services.chat.getAvailableDocuments(workflow)
if docs_index and docs_index != "No documents available":
doc_count = docs_index.count("docItem:") # Only count actual documents, not document list labels
lines.append(f"Available documents: {doc_count}")
@@ -447,7 +448,7 @@ def extractLatestRefinementFeedback(context: Any) -> str:
def extractAvailableDocumentsSummary(service: Any, context: Any) -> str:
"""Summary of available documents (count only)."""
try:
- documents = service.workflow.getAvailableDocuments(service.currentWorkflow)
+ documents = service.chat.getAvailableDocuments(service.workflow)
if documents and documents != "No documents available":
# Count only actual documents, not list labels
doc_count = documents.count("docItem:")
@@ -460,7 +461,7 @@ def extractAvailableDocumentsSummary(service: Any, context: Any) -> str:
def extractAvailableDocumentsIndex(service: Any, context: Any) -> str:
"""Index of available documents with detailed references for parameter generation."""
try:
- return service.workflow.getAvailableDocuments(service.currentWorkflow)
+ return service.chat.getAvailableDocuments(service.workflow)
except Exception as e:
logger.error(f"Error getting document index: {str(e)}")
return "No documents available"
diff --git a/modules/workflows/processing/shared/promptGenerationActionsActionplan.py b/modules/workflows/processing/shared/promptGenerationActionsActionplan.py
index ac5732ef..002169e0 100644
--- a/modules/workflows/processing/shared/promptGenerationActionsActionplan.py
+++ b/modules/workflows/processing/shared/promptGenerationActionsActionplan.py
@@ -24,7 +24,7 @@ def generateActionDefinitionPrompt(services, context: Any) -> PromptBundle:
PromptPlaceholder(label="USER_PROMPT", content=extractUserPrompt(context), summaryAllowed=False),
PromptPlaceholder(label="AVAILABLE_DOCUMENTS_SUMMARY", content=extractAvailableDocumentsSummary(services, context), summaryAllowed=True),
PromptPlaceholder(label="AVAILABLE_CONNECTIONS_INDEX", content=extractAvailableConnectionsIndex(services), summaryAllowed=False),
- PromptPlaceholder(label="WORKFLOW_HISTORY", content=extractWorkflowHistory(services, context), summaryAllowed=True),
+ PromptPlaceholder(label="WORKFLOW_HISTORY", content=extractWorkflowHistory(services), summaryAllowed=True),
PromptPlaceholder(label="AVAILABLE_METHODS", content=extractAvailableMethods(services), summaryAllowed=False),
PromptPlaceholder(label="USER_LANGUAGE", content=extractUserLanguage(services), summaryAllowed=False),
]
diff --git a/modules/workflows/processing/shared/promptGenerationActionsDynamic.py b/modules/workflows/processing/shared/promptGenerationActionsDynamic.py
index f0ffee3a..794f4175 100644
--- a/modules/workflows/processing/shared/promptGenerationActionsDynamic.py
+++ b/modules/workflows/processing/shared/promptGenerationActionsDynamic.py
@@ -32,7 +32,7 @@ def generateDynamicPlanSelectionPrompt(services, context: Any, learningEngine=No
PromptPlaceholder(label="AVAILABLE_DOCUMENTS_SUMMARY", content=extractAvailableDocumentsSummary(services, context), summaryAllowed=True),
PromptPlaceholder(label="AVAILABLE_METHODS", content=extractAvailableMethods(services), summaryAllowed=False),
# Provide enriched history context for Stage 1 to craft parametersContext
- PromptPlaceholder(label="WORKFLOW_HISTORY", content=extractWorkflowHistory(services, context), summaryAllowed=True),
+ PromptPlaceholder(label="WORKFLOW_HISTORY", content=extractWorkflowHistory(services), summaryAllowed=True),
# Provide deterministic indexes so the planner can choose exact labels
PromptPlaceholder(label="AVAILABLE_DOCUMENTS_INDEX", content=extractAvailableDocumentsIndex(services, context), summaryAllowed=True),
PromptPlaceholder(label="AVAILABLE_CONNECTIONS_INDEX", content=extractAvailableConnectionsIndex(services), summaryAllowed=False),
diff --git a/modules/workflows/processing/shared/promptGenerationTaskplan.py b/modules/workflows/processing/shared/promptGenerationTaskplan.py
index de101015..9a9008b7 100644
--- a/modules/workflows/processing/shared/promptGenerationTaskplan.py
+++ b/modules/workflows/processing/shared/promptGenerationTaskplan.py
@@ -23,7 +23,7 @@ def generateTaskPlanningPrompt(services, context: Any) -> PromptBundle:
placeholders: List[PromptPlaceholder] = [
PromptPlaceholder(label="USER_PROMPT", content=extractUserPrompt(context), summaryAllowed=False),
PromptPlaceholder(label="AVAILABLE_DOCUMENTS_SUMMARY", content=extractAvailableDocumentsSummary(services, context), summaryAllowed=True),
- PromptPlaceholder(label="WORKFLOW_HISTORY", content=extractWorkflowHistory(services, context), summaryAllowed=True),
+ PromptPlaceholder(label="WORKFLOW_HISTORY", content=extractWorkflowHistory(services), summaryAllowed=True),
PromptPlaceholder(label="USER_LANGUAGE", content=userLanguage, summaryAllowed=False),
]
diff --git a/modules/workflows/processing/shared/stateTools.py b/modules/workflows/processing/shared/stateTools.py
new file mode 100644
index 00000000..1f625955
--- /dev/null
+++ b/modules/workflows/processing/shared/stateTools.py
@@ -0,0 +1,46 @@
+"""
+State Tools
+Shared utilities for workflow state management and validation.
+"""
+
+import logging
+from typing import Any
+
+logger = logging.getLogger(__name__)
+
+
+class WorkflowStoppedException(Exception):
+ """Exception raised when a workflow is stopped by the user."""
+ pass
+
+
+def checkWorkflowStopped(services: Any) -> None:
+ """
+ Check if workflow has been stopped by user and raise exception if so.
+
+ Args:
+ services: Services object with workflow and interfaceDbChat for fresh status check
+
+ Raises:
+ WorkflowStoppedException: If workflow status is "stopped"
+ """
+ workflow = services.workflow
+ if not workflow:
+ return
+
+ try:
+ # Get the current workflow status from the database to avoid stale data
+ currentWorkflow = services.interfaceDbChat.getWorkflow(workflow.id)
+ if currentWorkflow and currentWorkflow.status == "stopped":
+ logger.info("Workflow stopped by user, aborting operation")
+ raise WorkflowStoppedException("Workflow was stopped by user")
+ except WorkflowStoppedException:
+ # Re-raise the stop signal immediately
+ raise
+ except Exception as e:
+ # If we can't get the current status due to other database issues, fall back to the in-memory object
+ logger.warning(f"Could not check current workflow status from database: {str(e)}")
+ if workflow and workflow.status == "stopped":
+ logger.info("Workflow stopped by user (from in-memory object), aborting operation")
+ raise WorkflowStoppedException("Workflow was stopped by user")
+
diff --git a/modules/workflows/processing/workflowProcessor.py b/modules/workflows/processing/workflowProcessor.py
index e91f6afc..88bb25fd 100644
--- a/modules/workflows/processing/workflowProcessor.py
+++ b/modules/workflows/processing/workflowProcessor.py
@@ -9,45 +9,27 @@ from modules.workflows.processing.modes.modeBase import BaseMode
from modules.workflows.processing.modes.modeActionplan import ActionplanMode
from modules.workflows.processing.modes.modeDynamic import DynamicMode
from modules.workflows.processing.modes.modeAutomation import AutomationMode
+from modules.workflows.processing.shared.stateTools import checkWorkflowStopped
logger = logging.getLogger(__name__)
-class WorkflowStoppedException(Exception):
- """Exception raised when a workflow is stopped by the user."""
- pass
-
class WorkflowProcessor:
"""Main workflow processor that delegates to appropriate mode implementations"""
- def __init__(self, services, workflow=None):
+ def __init__(self, services):
self.services = services
- self.workflow = workflow
- self.mode = self._createMode(workflow.workflowMode)
+ self.mode = self._createMode(services.workflow.workflowMode)
def _createMode(self, workflowMode: WorkflowModeEnum) -> BaseMode:
"""Create the appropriate mode implementation based on workflow mode"""
if workflowMode == WorkflowModeEnum.WORKFLOW_DYNAMIC:
- return DynamicMode(self.services, self.workflow)
+ return DynamicMode(self.services)
elif workflowMode == WorkflowModeEnum.WORKFLOW_ACTIONPLAN:
- return ActionplanMode(self.services, self.workflow)
+ return ActionplanMode(self.services)
elif workflowMode == WorkflowModeEnum.WORKFLOW_AUTOMATION:
- return AutomationMode(self.services, self.workflow)
+ return AutomationMode(self.services)
else:
raise ValueError(f"Invalid workflow mode: {workflowMode}")
- def _checkWorkflowStopped(self, workflow):
- """Check if workflow has been stopped by user and raise exception if so"""
- try:
- # Get the current workflow status from the database to avoid stale data
- current_workflow = self.services.interfaceDbChat.getWorkflow(workflow.id)
- if current_workflow and current_workflow.status == "stopped":
- logger.info("Workflow stopped by user, aborting processing")
- raise WorkflowStoppedException("Workflow was stopped by user")
- except Exception as e:
- # If we can't get the current status due to other database issues, fall back to the in-memory object
- logger.warning(f"Could not check current workflow status from database: {str(e)}")
- if workflow and workflow.status == "stopped":
- logger.info("Workflow stopped by user (from in-memory object), aborting processing")
- raise WorkflowStoppedException("Workflow was stopped by user")
async def generateTaskPlan(self, userInput: str, workflow: ChatWorkflow) -> TaskPlan:
"""Generate a high-level task plan for the workflow"""
@@ -58,10 +40,10 @@ class WorkflowProcessor:
try:
# Check workflow status before generating task plan
- self._checkWorkflowStopped(workflow)
+ checkWorkflowStopped(self.services)
# Start progress tracking
- self.services.workflow.progressLogStart(
+ self.services.chat.progressLogStart(
operationId,
"Workflow Planning",
"Task Plan Generation",
@@ -78,25 +60,25 @@ class WorkflowProcessor:
logger.info(f"Workflow Mode: {modeValue}")
# Update progress - generating task plan
- self.services.workflow.progressLogUpdate(operationId, 0.3, "Analyzing input")
+ self.services.chat.progressLogUpdate(operationId, 0.3, "Analyzing input")
# Delegate to the appropriate mode
taskPlan = await self.mode.generateTaskPlan(userInput, workflow)
# Update progress - creating task plan message
- self.services.workflow.progressLogUpdate(operationId, 0.8, "Creating plan")
+ self.services.chat.progressLogUpdate(operationId, 0.8, "Creating plan")
# Create task plan message
await self.mode.createTaskPlanMessage(taskPlan, workflow)
# Complete progress tracking
- self.services.workflow.progressLogFinish(operationId, True)
+ self.services.chat.progressLogFinish(operationId, True)
return taskPlan
except Exception as e:
logger.error(f"Error in generateTaskPlan: {str(e)}")
# Complete progress tracking with failure
- self.services.workflow.progressLogFinish(operationId, False)
+ self.services.chat.progressLogFinish(operationId, False)
raise
async def executeTask(self, taskStep: TaskStep, workflow: ChatWorkflow, context: TaskContext,
@@ -109,10 +91,10 @@ class WorkflowProcessor:
try:
# Check workflow status before executing task
- self._checkWorkflowStopped(workflow)
+ checkWorkflowStopped(self.services)
# Start progress tracking
- self.services.workflow.progressLogStart(
+ self.services.chat.progressLogStart(
operationId,
"Workflow Execution",
"Task Execution",
@@ -125,19 +107,19 @@ class WorkflowProcessor:
logger.info(f"Mode: {modeValue}")
# Update progress - executing task
- self.services.workflow.progressLogUpdate(operationId, 0.2, "Executing")
+ self.services.chat.progressLogUpdate(operationId, 0.2, "Executing")
# Delegate to the appropriate mode
result = await self.mode.executeTask(taskStep, workflow, context, taskIndex, totalTasks)
# Complete progress tracking
- self.services.workflow.progressLogFinish(operationId, True)
+ self.services.chat.progressLogFinish(operationId, True)
return result
except Exception as e:
logger.error(f"Error in executeTask: {str(e)}")
# Complete progress tracking with failure
- self.services.workflow.progressLogFinish(operationId, False)
+ self.services.chat.progressLogFinish(operationId, False)
raise
async def generateActionItems(self, taskStep: TaskStep, workflow: ChatWorkflow,
@@ -145,7 +127,7 @@ class WorkflowProcessor:
"""Generate actions for a task step using the appropriate mode"""
try:
# Check workflow status before generating actions
- self._checkWorkflowStopped(workflow)
+ checkWorkflowStopped(self.services)
logger.info(f"=== STARTING ACTION GENERATION ===")
logger.info(f"Task: {taskStep.objective}")
@@ -287,7 +269,7 @@ class WorkflowProcessor:
"""Prepare task handover data for workflow coordination"""
try:
# Check workflow status before preparing task handover
- self._checkWorkflowStopped(workflow)
+ checkWorkflowStopped(self.services)
# Log handover status summary
status = taskResult.status if taskResult else 'unknown'
diff --git a/modules/workflows/workflowManager.py b/modules/workflows/workflowManager.py
index fbf8bd69..edcce5e0 100644
--- a/modules/workflows/workflowManager.py
+++ b/modules/workflows/workflowManager.py
@@ -12,7 +12,8 @@ from modules.datamodels.datamodelChat import (
WorkflowModeEnum
)
from modules.datamodels.datamodelChat import TaskContext
-from modules.workflows.processing.workflowProcessor import WorkflowProcessor, WorkflowStoppedException
+from modules.workflows.processing.workflowProcessor import WorkflowProcessor
+from modules.workflows.processing.shared.stateTools import WorkflowStoppedException, checkWorkflowStopped
logger = logging.getLogger(__name__)
@@ -34,22 +35,22 @@ class WorkflowManager:
currentTime = self.services.utils.timestampGetUtc()
if workflowId:
- workflow = self.services.workflow.getWorkflow(workflowId)
+ workflow = self.services.chat.getWorkflow(workflowId)
if not workflow:
raise ValueError(f"Workflow {workflowId} not found")
- # Store workflow in services for reference
- self.services.currentWorkflow = workflow
+ # Store workflow in services for reference (this is the ChatWorkflow object)
+ self.services.workflow = workflow
if workflow.status == "running":
logger.info(f"Stopping running workflow {workflowId} before processing new prompt")
workflow.status = "stopped"
workflow.lastActivity = currentTime
- self.services.workflow.updateWorkflow(workflowId, {
+ self.services.chat.updateWorkflow(workflowId, {
"status": "stopped",
"lastActivity": currentTime
})
- self.services.workflow.storeLog(workflow, {
+ self.services.chat.storeLog(workflow, {
"message": "Workflow stopped for new prompt",
"type": "info",
"status": "stopped",
@@ -57,7 +58,7 @@ class WorkflowManager:
})
newRound = workflow.currentRound + 1
- self.services.workflow.updateWorkflow(workflowId, {
+ self.services.chat.updateWorkflow(workflowId, {
"status": "running",
"lastActivity": currentTime,
"currentRound": newRound,
@@ -70,7 +71,7 @@ class WorkflowManager:
workflow.currentRound = newRound
workflow.workflowMode = workflowMode
- self.services.workflow.storeLog(workflow, {
+ self.services.chat.storeLog(workflow, {
"message": f"Workflow resumed (round {workflow.currentRound}) with mode: {workflowMode}",
"type": "info",
"status": "running",
@@ -94,14 +95,15 @@ class WorkflowManager:
"maxSteps": 5 if workflowMode == WorkflowModeEnum.WORKFLOW_DYNAMIC else 1, # Set maxSteps for Dynamic mode
}
- workflow = self.services.workflow.createWorkflow(workflowData)
+ workflow = self.services.chat.createWorkflow(workflowData)
logger.info(f"Created workflow with mode: {getattr(workflow, 'workflowMode', 'NOT_SET')}")
logger.info(f"Workflow data passed: {workflowData.get('workflowMode', 'NOT_IN_DATA')}")
- self.services.currentWorkflow = workflow
+ # Store workflow in services (this is the ChatWorkflow object)
+ self.services.workflow = workflow
# Start workflow processing asynchronously
- asyncio.create_task(self._workflowProcess(userInput, workflow))
+ asyncio.create_task(self._workflowProcess(userInput))
return workflow
except Exception as e:
@@ -111,17 +113,20 @@ class WorkflowManager:
async def workflowStop(self, workflowId: str) -> ChatWorkflow:
"""Stops a running workflow."""
try:
- workflow = self.services.workflow.getWorkflow(workflowId)
+ workflow = self.services.chat.getWorkflow(workflowId)
if not workflow:
raise ValueError(f"Workflow {workflowId} not found")
+ # Store workflow in services (this is the ChatWorkflow object)
+ self.services.workflow = workflow
+
workflow.status = "stopped"
workflow.lastActivity = self.services.utils.timestampGetUtc()
- self.services.workflow.updateWorkflow(workflowId, {
+ self.services.chat.updateWorkflow(workflowId, {
"status": "stopped",
"lastActivity": workflow.lastActivity
})
- self.services.workflow.storeLog(workflow, {
+ self.services.chat.storeLog(workflow, {
"message": "Workflow stopped",
"type": "warning",
"status": "stopped",
@@ -134,34 +139,35 @@ class WorkflowManager:
# Main processor
- async def _workflowProcess(self, userInput: UserInputRequest, workflow: ChatWorkflow) -> None:
+ async def _workflowProcess(self, userInput: UserInputRequest) -> None:
"""Process a workflow with user input"""
try:
# Store the current user prompt in services for easy access throughout the workflow
self.services.rawUserPrompt = userInput.prompt
self.services.currentUserPrompt = userInput.prompt
- # Update the workflow service with the current workflow context
- self.services.workflow.setCurrentWorkflow(workflow)
+ # Reset progress logger for new workflow
+ self.services.chat._progressLogger = None
- self.workflowProcessor = WorkflowProcessor(self.services, workflow)
- await self._sendFirstMessage(userInput, workflow)
- task_plan = await self._planTasks(userInput, workflow)
- await self._executeTasks(task_plan, workflow)
- await self._processWorkflowResults(workflow)
+ self.workflowProcessor = WorkflowProcessor(self.services)
+ await self._sendFirstMessage(userInput)
+ task_plan = await self._planTasks(userInput)
+ await self._executeTasks(task_plan)
+ await self._processWorkflowResults()
except WorkflowStoppedException:
- self._handleWorkflowStop(workflow)
+ self._handleWorkflowStop()
except Exception as e:
- self._handleWorkflowError(workflow, e)
+ self._handleWorkflowError(e)
# Helper functions
- async def _sendFirstMessage(self, userInput: UserInputRequest, workflow: ChatWorkflow) -> None:
+ async def _sendFirstMessage(self, userInput: UserInputRequest) -> None:
"""Send first message to start workflow"""
try:
- self.workflowProcessor._checkWorkflowStopped(workflow)
+ workflow = self.services.workflow
+ checkWorkflowStopped(self.services)
# Create initial message using interface
# For first user message, include round info in the user context label
@@ -286,7 +292,7 @@ class WorkflowManager:
self.services.interfaceDbComponent.createFileData(fileItem.id, contentBytes)
# Collect file info
- fileInfo = self.services.workflow.getFileInfo(fileItem.id)
+ fileInfo = self.services.chat.getFileInfo(fileItem.id)
from modules.datamodels.datamodelChat import ChatDocument
doc = ChatDocument(
fileId=fileItem.id,
@@ -310,14 +316,15 @@ class WorkflowManager:
logger.warning(f"Failed to process user fileIds: {e}")
# Finally, persist and bind the first message with combined documents (context + user)
- self.services.workflow.storeMessageWithDocuments(workflow, messageData, createdDocs)
+ self.services.chat.storeMessageWithDocuments(workflow, messageData, createdDocs)
except Exception as e:
logger.error(f"Error sending first message: {str(e)}")
raise
- async def _planTasks(self, userInput: UserInputRequest, workflow: ChatWorkflow):
+ async def _planTasks(self, userInput: UserInputRequest):
"""Generate task plan for workflow execution"""
+ workflow = self.services.workflow
handling = self.workflowProcessor
# Generate task plan first (shared for both modes)
taskPlan = await handling.generateTaskPlan(userInput.prompt, workflow)
@@ -328,8 +335,9 @@ class WorkflowManager:
logger.info(f"Executing workflow mode={workflowMode} with {len(taskPlan.tasks)} tasks")
return taskPlan
- async def _executeTasks(self, taskPlan, workflow: ChatWorkflow) -> None:
+ async def _executeTasks(self, taskPlan) -> None:
"""Execute all tasks in the task plan and update workflow status."""
+ workflow = self.services.workflow
handling = self.workflowProcessor
totalTasks = len(taskPlan.tasks)
allTaskResults: List = []
@@ -377,11 +385,12 @@ class WorkflowManager:
workflow.status = "completed"
return None
- async def _processWorkflowResults(self, workflow: ChatWorkflow) -> None:
+ async def _processWorkflowResults(self) -> None:
"""Process workflow results based on workflow status and create appropriate messages"""
try:
+ workflow = self.services.workflow
try:
- self.workflowProcessor._checkWorkflowStopped(workflow)
+ checkWorkflowStopped(self.services)
except WorkflowStoppedException:
logger.info(f"Workflow {workflow.id} was stopped during result processing")
@@ -403,12 +412,12 @@ class WorkflowManager:
"taskProgress": "stopped",
"actionProgress": "stopped"
}
- self.services.workflow.storeMessageWithDocuments(workflow, stoppedMessage, [])
+ self.services.chat.storeMessageWithDocuments(workflow, stoppedMessage, [])
# Update workflow status to stopped
workflow.status = "stopped"
workflow.lastActivity = self.services.utils.timestampGetUtc()
- self.services.workflow.updateWorkflow(workflow.id, {
+ self.services.chat.updateWorkflow(workflow.id, {
"status": "stopped",
"lastActivity": workflow.lastActivity
})
@@ -433,12 +442,12 @@ class WorkflowManager:
"taskProgress": "stopped",
"actionProgress": "stopped"
}
- self.services.workflow.storeMessageWithDocuments(workflow, stoppedMessage, [])
+ self.services.chat.storeMessageWithDocuments(workflow, stopped_message, [])
# Update workflow status to stopped
workflow.status = "stopped"
workflow.lastActivity = self.services.utils.timestampGetUtc()
- self.services.workflow.updateWorkflow(workflow.id, {
+ self.services.chat.updateWorkflow(workflow.id, {
"status": "stopped",
"lastActivity": workflow.lastActivity,
"totalTasks": workflow.totalTasks,
@@ -446,7 +455,7 @@ class WorkflowManager:
})
# Add stopped log entry
- self.services.workflow.storeLog(workflow, {
+ self.services.chat.storeLog(workflow, {
"message": "Workflow stopped by user",
"type": "warning",
"status": "stopped",
@@ -472,12 +481,12 @@ class WorkflowManager:
"taskProgress": "fail",
"actionProgress": "fail"
}
- self.services.workflow.storeMessageWithDocuments(workflow, errorMessage, [])
+ self.services.chat.storeMessageWithDocuments(workflow, errorMessage, [])
# Update workflow status to failed
workflow.status = "failed"
workflow.lastActivity = self.services.utils.timestampGetUtc()
- self.services.workflow.updateWorkflow(workflow.id, {
+ self.services.chat.updateWorkflow(workflow.id, {
"status": "failed",
"lastActivity": workflow.lastActivity,
"totalTasks": workflow.totalTasks,
@@ -485,7 +494,7 @@ class WorkflowManager:
})
# Add failed log entry
- self.services.workflow.storeLog(workflow, {
+ self.services.chat.storeLog(workflow, {
"message": "Workflow failed: Unknown error",
"type": "error",
"status": "failed",
@@ -494,7 +503,7 @@ class WorkflowManager:
return
# For successful workflows, send detailed completion message
- await self._sendLastMessage(workflow)
+ await self._sendLastMessage()
except Exception as e:
logger.error(f"Error processing workflow results: {str(e)}")
@@ -516,28 +525,29 @@ class WorkflowManager:
"taskProgress": "fail",
"actionProgress": "fail"
}
- self.services.workflow.storeMessageWithDocuments(workflow, errorMessage, [])
+ self.services.chat.storeMessageWithDocuments(workflow, error_message, [])
# Update workflow status to failed
workflow.status = "failed"
workflow.lastActivity = self.services.utils.timestampGetUtc()
- self.services.workflow.updateWorkflow(workflow.id, {
+ self.services.chat.updateWorkflow(workflow.id, {
"status": "failed",
"lastActivity": workflow.lastActivity,
"totalTasks": workflow.totalTasks,
"totalActions": workflow.totalActions
})
- async def _sendLastMessage(self, workflow: ChatWorkflow) -> None:
+ async def _sendLastMessage(self) -> None:
"""Send last message to complete workflow (only for successful workflows)"""
try:
+ workflow = self.services.workflow
# Safety check: ensure this is only called for successful workflows
if workflow.status in ['stopped', 'failed']:
logger.warning(f"Attempted to send last message for {workflow.status} workflow {workflow.id}")
return
# Generate feedback
- feedback = await self._generateWorkflowFeedback(workflow)
+ feedback = await self._generateWorkflowFeedback()
# Create last message using interface
messageData = {
@@ -559,20 +569,20 @@ class WorkflowManager:
}
# Create message using interface
- self.services.workflow.storeMessageWithDocuments(workflow, messageData, [])
+ self.services.chat.storeMessageWithDocuments(workflow, messageData, [])
# Update workflow status to completed
workflow.status = "completed"
workflow.lastActivity = self.services.utils.timestampGetUtc()
# Update workflow in database
- self.services.workflow.updateWorkflow(workflow.id, {
+ self.services.chat.updateWorkflow(workflow.id, {
"status": "completed",
"lastActivity": workflow.lastActivity
})
# Add completion log entry
- self.services.workflow.storeLog(workflow, {
+ self.services.chat.storeLog(workflow, {
"message": "Workflow completed",
"type": "success",
"status": "completed",
@@ -583,10 +593,11 @@ class WorkflowManager:
logger.error(f"Error sending last message: {str(e)}")
raise
- async def _generateWorkflowFeedback(self, workflow: ChatWorkflow) -> str:
+ async def _generateWorkflowFeedback(self) -> str:
"""Generate feedback message for workflow completion"""
try:
- self.workflowProcessor._checkWorkflowStopped(workflow)
+ workflow = self.services.workflow
+ checkWorkflowStopped(self.services)
# Count messages by role
userMessages = [msg for msg in workflow.messages if msg.role == 'user']
@@ -610,14 +621,15 @@ class WorkflowManager:
logger.error(f"Error generating workflow feedback: {str(e)}")
return "Workflow processing completed."
- def _handleWorkflowStop(self, workflow: ChatWorkflow) -> None:
+ def _handleWorkflowStop(self) -> None:
"""Handle workflow stop exception"""
+ workflow = self.services.workflow
logger.info("Workflow stopped by user")
# Update workflow status to stopped
workflow.status = "stopped"
workflow.lastActivity = self.services.utils.timestampGetUtc()
- self.services.workflow.updateWorkflow(workflow.id, {
+ self.services.chat.updateWorkflow(workflow.id, {
"status": "stopped",
"lastActivity": workflow.lastActivity,
"totalTasks": workflow.totalTasks,
@@ -642,24 +654,25 @@ class WorkflowManager:
"taskProgress": "pending",
"actionProgress": "pending"
}
- self.services.workflow.storeMessageWithDocuments(workflow, stopped_message, [])
+ self.services.chat.storeMessageWithDocuments(workflow, stopped_message, [])
# Add log entry
- self.services.workflow.storeLog(workflow, {
+ self.services.chat.storeLog(workflow, {
"message": "Workflow stopped by user",
"type": "warning",
"status": "stopped",
"progress": 100
})
- def _handleWorkflowError(self, workflow: ChatWorkflow, error: Exception) -> None:
+ def _handleWorkflowError(self, error: Exception) -> None:
"""Handle workflow error exception"""
+ workflow = self.services.workflow
logger.error(f"Workflow processing error: {str(error)}")
# Update workflow status to failed
workflow.status = "failed"
workflow.lastActivity = self.services.utils.timestampGetUtc()
- self.services.workflow.updateWorkflow(workflow.id, {
+ self.services.chat.updateWorkflow(workflow.id, {
"status": "failed",
"lastActivity": workflow.lastActivity,
"totalTasks": workflow.totalTasks,
@@ -684,10 +697,10 @@ class WorkflowManager:
"taskProgress": "fail",
"actionProgress": "fail"
}
- self.services.workflow.storeMessageWithDocuments(workflow, error_message, [])
+ self.services.chat.storeMessageWithDocuments(workflow, error_message, [])
# Add error log entry
- self.services.workflow.storeLog(workflow, {
+ self.services.chat.storeLog(workflow, {
"message": f"Workflow failed: {str(error)}",
"type": "error",
"status": "failed",
@@ -701,8 +714,8 @@ class WorkflowManager:
documents = []
for fileId in fileIds:
try:
- # Get file info from unified workflow service
- fileInfo = self.services.workflow.getFileInfo(fileId)
+ # Get file info from chat service
+ fileInfo = self.services.chat.getFileInfo(fileId)
if fileInfo:
# Create document directly with all file attributes
document = ChatDocument(
diff --git a/test4_method_ai_operations.py b/test4_method_ai_operations.py
index 4de11ece..e4f91587 100644
--- a/test4_method_ai_operations.py
+++ b/test4_method_ai_operations.py
@@ -214,11 +214,6 @@ class MethodAiOperationsTester:
print(f"Debug: services id: {id(self.services)}")
print(f"Debug: methodAi.services id: {id(self.methodAi.services)}")
- # Final safety check: ensure methodAi.services has the workflow
- if hasattr(self.methodAi, 'services') and not self.methodAi.services.currentWorkflow:
- print(f"ā ļø Fixing: Setting workflow in methodAi.services...")
- self.methodAi.services.currentWorkflow = self.services.currentWorkflow
-
actionResult = await self.methodAi.process(parameters)
endTime = asyncio.get_event_loop().time()