From f80fddbf8a34ccc6b25539dab3798a3d44e6ff88 Mon Sep 17 00:00:00 2001
From: ValueOn AG
Date: Thu, 16 Oct 2025 23:59:27 +0200
Subject: [PATCH] Cleaned chat objects workflow, documents, messages, logs,
stats
---
.../serviceAi/subDocumentGeneration.py | 21 ++++---------------
.../serviceWorkflow/mainServiceWorkflow.py | 8 +++----
.../processing/core/actionExecutor.py | 3 ++-
.../processing/core/messageCreator.py | 14 ++++++-------
.../workflows/processing/modes/modeReact.py | 6 ------
modules/workflows/workflowManager.py | 17 +++++++--------
6 files changed, 23 insertions(+), 46 deletions(-)
diff --git a/modules/services/serviceAi/subDocumentGeneration.py b/modules/services/serviceAi/subDocumentGeneration.py
index 504ba154..21dc9cd6 100644
--- a/modules/services/serviceAi/subDocumentGeneration.py
+++ b/modules/services/serviceAi/subDocumentGeneration.py
@@ -677,7 +677,7 @@ Return only the JSON response.
mimeType=file_info.get("mimeType", "application/json") if file_info else "application/json"
)
- # Create message referencing the file
+ # Create message referencing the file - include document in initial call
messageData = {
"workflowId": workflow.id,
"role": "assistant",
@@ -688,22 +688,9 @@ Return only the JSON response.
"documentsLabel": label,
"documents": []
}
- message = services.workflow.storeMessageWithDocuments(services.workflow.workflow, messageData, [])
- if not message:
- return
-
- # Persist ChatDocument with messageId
- doc.messageId = message.id
- services.interfaceDbChat.createDocument(doc.to_dict())
-
- # Update message to include document
- try:
- if not message.documents:
- message.documents = []
- message.documents.append(doc)
- services.workflow.updateMessage(message.id, {"documents": [d.to_dict() for d in message.documents]})
- except Exception:
- pass
+
+ # Store message with document included from the start
+ services.workflow.storeMessageWithDocuments(services.workflow.workflow, messageData, [doc])
except Exception:
# Non-fatal; ignore if storage or chat creation fails
return
diff --git a/modules/services/serviceWorkflow/mainServiceWorkflow.py b/modules/services/serviceWorkflow/mainServiceWorkflow.py
index 0b2851e0..589971c6 100644
--- a/modules/services/serviceWorkflow/mainServiceWorkflow.py
+++ b/modules/services/serviceWorkflow/mainServiceWorkflow.py
@@ -2,7 +2,7 @@ import logging
import uuid
from typing import Dict, Any, List, Optional
from modules.datamodels.datamodelUam import User, UserConnection
-from modules.datamodels.datamodelChat import ChatDocument, ChatMessage, ChatStat
+from modules.datamodels.datamodelChat import ChatDocument, ChatMessage, ChatStat, ChatLog
from modules.datamodels.datamodelChat import ChatContentExtracted
from modules.services.serviceExtraction.mainServiceExtraction import ExtractionService
from modules.services.serviceGeneration.subDocumentUtility import getFileExtension, getMimeTypeFromExtension, detectContentTypeFromData
@@ -498,7 +498,7 @@ class WorkflowService:
# === Service-level transactions (DB write-through + in-memory sync) ===
- def storeMessageWithDocuments(self, workflow: Any, messageData: Dict[str, Any], documents: List[Any]):
+ def storeMessageWithDocuments(self, workflow: Any, messageData: Dict[str, Any], documents: List[Any]) -> ChatMessage:
"""Persist message and documents, then bind them into in-memory workflow (replace-by-id)."""
# Ensure workflowId on message
messageData = dict(messageData or {})
@@ -522,7 +522,7 @@ class WorkflowService:
workflow.messages.append(chatMessage)
return chatMessage
- def storeLog(self, workflow: Any, logData: Dict[str, Any]) -> Any:
+ def storeLog(self, workflow: Any, logData: Dict[str, Any]) -> ChatLog:
"""Persist ChatLog and map it into the in-memory workflow logs list."""
logData = dict(logData or {})
logData["workflowId"] = workflow.id
@@ -541,7 +541,7 @@ class WorkflowService:
workflow.logs.append(chatLog)
return chatLog
- def storeWorkflowStat(self, workflow: Any, aiResponse: Any, process: str) -> Any:
+ def storeWorkflowStat(self, workflow: Any, aiResponse: Any, process: str) -> ChatStat:
"""Persist workflow-level ChatStat from AiCallResponse and append to workflow stats list."""
try:
# Create ChatStat from AiCallResponse data
diff --git a/modules/workflows/processing/core/actionExecutor.py b/modules/workflows/processing/core/actionExecutor.py
index 1060e954..fd649337 100644
--- a/modules/workflows/processing/core/actionExecutor.py
+++ b/modules/workflows/processing/core/actionExecutor.py
@@ -159,7 +159,8 @@ class ActionExecutor:
# Create database log entry for action failure (write-through + bind)
self.services.workflow.storeLog(workflow, {
"message": f"ā **Task {taskNum}**ā **Action {actionNum}/{totalActions}** failed: {result.error}",
- "type": "error"
+ "type": "error",
+ "progress": 100
})
# Log action summary
diff --git a/modules/workflows/processing/core/messageCreator.py b/modules/workflows/processing/core/messageCreator.py
index 1bf75fb7..fe543bb3 100644
--- a/modules/workflows/processing/core/messageCreator.py
+++ b/modules/workflows/processing/core/messageCreator.py
@@ -67,7 +67,7 @@ class MessageCreator:
"taskProgress": "pending"
}
- message = self.services.workflow.storeMessageWithDocuments(workflow, messageData, [])
+ self.services.workflow.storeMessageWithDocuments(workflow, messageData, [])
logger.info("Task plan message created successfully")
except Exception as e:
logger.error(f"Error creating task plan message: {str(e)}")
@@ -101,7 +101,7 @@ class MessageCreator:
if taskStep.userMessage:
taskStartMessage["message"] += f"\n\nš¬ {taskStep.userMessage}"
- message = self.services.workflow.storeMessageWithDocuments(workflow, taskStartMessage, [])
+ self.services.workflow.storeMessageWithDocuments(workflow, taskStartMessage, [])
logger.info(f"Task start message created for task {taskIndex}")
except Exception as e:
logger.error(f"Error creating task start message: {str(e)}")
@@ -182,12 +182,10 @@ class MessageCreator:
logger.info(f"Creating ERROR message: {messageText}")
logger.info(f"Message data: {messageData}")
- message = self.services.workflow.storeMessageWithDocuments(workflow, messageData, createdDocuments)
+ self.services.workflow.storeMessageWithDocuments(workflow, messageData, createdDocuments)
logger.info(f"Message created: {action.execMethod}.{action.execAction}")
- return message
except Exception as e:
logger.error(f"Error creating action message: {str(e)}")
- return None
async def createTaskCompletionMessage(self, taskStep: TaskStep, workflow: ChatWorkflow, taskIndex: int,
totalTasks: int, reviewResult: ReviewResult):
@@ -227,7 +225,7 @@ class MessageCreator:
"taskProgress": "success"
}
- message = self.services.workflow.storeMessageWithDocuments(workflow, taskCompletionMessage, [])
+ self.services.workflow.storeMessageWithDocuments(workflow, taskCompletionMessage, [])
logger.info(f"Task completion message created for task {taskIndex}")
except Exception as e:
logger.error(f"Error creating task completion message: {str(e)}")
@@ -254,7 +252,7 @@ class MessageCreator:
"taskProgress": "retry"
}
- message = self.services.workflow.storeMessageWithDocuments(workflow, retryMessage, [])
+ self.services.workflow.storeMessageWithDocuments(workflow, retryMessage, [])
logger.info(f"Retry message created for task {taskIndex}")
except Exception as e:
logger.error(f"Error creating retry message: {str(e)}")
@@ -293,7 +291,7 @@ class MessageCreator:
"taskProgress": "fail"
}
- message = self.services.workflow.storeMessageWithDocuments(workflow, messageData, [])
+ self.services.workflow.storeMessageWithDocuments(workflow, messageData, [])
logger.info(f"Error message created for task {taskIndex}")
except Exception as e:
logger.error(f"Error creating error message: {str(e)}")
diff --git a/modules/workflows/processing/modes/modeReact.py b/modules/workflows/processing/modes/modeReact.py
index 0503e56a..36ca90d9 100644
--- a/modules/workflows/processing/modes/modeReact.py
+++ b/modules/workflows/processing/modes/modeReact.py
@@ -147,12 +147,6 @@ class ReactMode(BaseMode):
context.improvements = []
context.improvements.append(f"Step {step}: {decision.get('reason')}")
- # Telemetry: simple duration per step
- duration = time.time() - t0
- self.services.workflow.storeLog(workflow, {
- "message": f"react_step_duration_sec={duration:.3f}",
- "type": "info"
- })
lastReviewDict = decision
# Create user-friendly message AFTER action execution
diff --git a/modules/workflows/workflowManager.py b/modules/workflows/workflowManager.py
index b47bfb97..88899e49 100644
--- a/modules/workflows/workflowManager.py
+++ b/modules/workflows/workflowManager.py
@@ -142,10 +142,10 @@ class WorkflowManager:
self.services.rawUserPrompt = userInput.prompt
self.services.currentUserPrompt = userInput.prompt
self.workflowProcessor = WorkflowProcessor(self.services, workflow)
- message = await self._sendFirstMessage(userInput, workflow)
+ await self._sendFirstMessage(userInput, workflow)
task_plan = await self._planTasks(userInput, workflow)
await self._executeTasks(task_plan, workflow)
- await self._processWorkflowResults(workflow, message)
+ await self._processWorkflowResults(workflow)
except WorkflowStoppedException:
self._handleWorkflowStop(workflow)
@@ -155,7 +155,7 @@ class WorkflowManager:
# Helper functions
- async def _sendFirstMessage(self, userInput: UserInputRequest, workflow: ChatWorkflow) -> ChatMessage:
+ async def _sendFirstMessage(self, userInput: UserInputRequest, workflow: ChatWorkflow) -> None:
"""Send first message to start workflow"""
try:
self.workflowProcessor._checkWorkflowStopped(workflow)
@@ -270,7 +270,7 @@ class WorkflowManager:
except Exception:
pass
- # Create documents for context items (in-memory ChatDocument; persistence via storeMessageWithDocuments)
+ # Create documents for context items
if contextItems and isinstance(contextItems, list):
for idx, item in enumerate(contextItems):
try:
@@ -317,8 +317,7 @@ class WorkflowManager:
logger.warning(f"Failed to process user fileIds: {e}")
# Finally, persist and bind the first message with combined documents (context + user)
- created_message = self.services.workflow.storeMessageWithDocuments(workflow, messageData, created_docs)
- return created_message
+ self.services.workflow.storeMessageWithDocuments(workflow, messageData, created_docs)
except Exception as e:
logger.error(f"Error sending first message: {str(e)}")
@@ -385,7 +384,7 @@ class WorkflowManager:
workflow.status = "completed"
return None
- async def _processWorkflowResults(self, workflow: ChatWorkflow, initial_message: ChatMessage) -> None:
+ async def _processWorkflowResults(self, workflow: ChatWorkflow) -> None:
"""Process workflow results based on workflow status and create appropriate messages"""
try:
try:
@@ -567,9 +566,7 @@ class WorkflowManager:
}
# Create message using interface
- message = self.services.workflow.storeMessageWithDocuments(workflow, messageData, [])
- if message:
- self.services.workflow.storeMessageWithDocuments(workflow, message.__dict__, getattr(message, 'documents', []))
+ self.services.workflow.storeMessageWithDocuments(workflow, messageData, [])
# Update workflow status to completed
workflow.status = "completed"