Cleaned chat objects workflow, documents, messages, logs, stats

This commit is contained in:
ValueOn AG 2025-10-16 23:59:27 +02:00
parent 1bb02880df
commit f80fddbf8a
6 changed files with 23 additions and 46 deletions

View file

@ -677,7 +677,7 @@ Return only the JSON response.
mimeType=file_info.get("mimeType", "application/json") if file_info else "application/json" mimeType=file_info.get("mimeType", "application/json") if file_info else "application/json"
) )
# Create message referencing the file # Create message referencing the file - include document in initial call
messageData = { messageData = {
"workflowId": workflow.id, "workflowId": workflow.id,
"role": "assistant", "role": "assistant",
@ -688,22 +688,9 @@ Return only the JSON response.
"documentsLabel": label, "documentsLabel": label,
"documents": [] "documents": []
} }
message = services.workflow.storeMessageWithDocuments(services.workflow.workflow, messageData, [])
if not message:
return
# Persist ChatDocument with messageId # Store message with document included from the start
doc.messageId = message.id services.workflow.storeMessageWithDocuments(services.workflow.workflow, messageData, [doc])
services.interfaceDbChat.createDocument(doc.to_dict())
# Update message to include document
try:
if not message.documents:
message.documents = []
message.documents.append(doc)
services.workflow.updateMessage(message.id, {"documents": [d.to_dict() for d in message.documents]})
except Exception:
pass
except Exception: except Exception:
# Non-fatal; ignore if storage or chat creation fails # Non-fatal; ignore if storage or chat creation fails
return return

View file

@ -2,7 +2,7 @@ import logging
import uuid import uuid
from typing import Dict, Any, List, Optional from typing import Dict, Any, List, Optional
from modules.datamodels.datamodelUam import User, UserConnection from modules.datamodels.datamodelUam import User, UserConnection
from modules.datamodels.datamodelChat import ChatDocument, ChatMessage, ChatStat from modules.datamodels.datamodelChat import ChatDocument, ChatMessage, ChatStat, ChatLog
from modules.datamodels.datamodelChat import ChatContentExtracted from modules.datamodels.datamodelChat import ChatContentExtracted
from modules.services.serviceExtraction.mainServiceExtraction import ExtractionService from modules.services.serviceExtraction.mainServiceExtraction import ExtractionService
from modules.services.serviceGeneration.subDocumentUtility import getFileExtension, getMimeTypeFromExtension, detectContentTypeFromData from modules.services.serviceGeneration.subDocumentUtility import getFileExtension, getMimeTypeFromExtension, detectContentTypeFromData
@ -498,7 +498,7 @@ class WorkflowService:
# === Service-level transactions (DB write-through + in-memory sync) === # === Service-level transactions (DB write-through + in-memory sync) ===
def storeMessageWithDocuments(self, workflow: Any, messageData: Dict[str, Any], documents: List[Any]): def storeMessageWithDocuments(self, workflow: Any, messageData: Dict[str, Any], documents: List[Any]) -> ChatMessage:
"""Persist message and documents, then bind them into in-memory workflow (replace-by-id).""" """Persist message and documents, then bind them into in-memory workflow (replace-by-id)."""
# Ensure workflowId on message # Ensure workflowId on message
messageData = dict(messageData or {}) messageData = dict(messageData or {})
@ -522,7 +522,7 @@ class WorkflowService:
workflow.messages.append(chatMessage) workflow.messages.append(chatMessage)
return chatMessage return chatMessage
def storeLog(self, workflow: Any, logData: Dict[str, Any]) -> Any: def storeLog(self, workflow: Any, logData: Dict[str, Any]) -> ChatLog:
"""Persist ChatLog and map it into the in-memory workflow logs list.""" """Persist ChatLog and map it into the in-memory workflow logs list."""
logData = dict(logData or {}) logData = dict(logData or {})
logData["workflowId"] = workflow.id logData["workflowId"] = workflow.id
@ -541,7 +541,7 @@ class WorkflowService:
workflow.logs.append(chatLog) workflow.logs.append(chatLog)
return chatLog return chatLog
def storeWorkflowStat(self, workflow: Any, aiResponse: Any, process: str) -> Any: def storeWorkflowStat(self, workflow: Any, aiResponse: Any, process: str) -> ChatStat:
"""Persist workflow-level ChatStat from AiCallResponse and append to workflow stats list.""" """Persist workflow-level ChatStat from AiCallResponse and append to workflow stats list."""
try: try:
# Create ChatStat from AiCallResponse data # Create ChatStat from AiCallResponse data

View file

@ -159,7 +159,8 @@ class ActionExecutor:
# Create database log entry for action failure (write-through + bind) # Create database log entry for action failure (write-through + bind)
self.services.workflow.storeLog(workflow, { self.services.workflow.storeLog(workflow, {
"message": f"❌ **Task {taskNum}**❌ **Action {actionNum}/{totalActions}** failed: {result.error}", "message": f"❌ **Task {taskNum}**❌ **Action {actionNum}/{totalActions}** failed: {result.error}",
"type": "error" "type": "error",
"progress": 100
}) })
# Log action summary # Log action summary

View file

@ -67,7 +67,7 @@ class MessageCreator:
"taskProgress": "pending" "taskProgress": "pending"
} }
message = self.services.workflow.storeMessageWithDocuments(workflow, messageData, []) self.services.workflow.storeMessageWithDocuments(workflow, messageData, [])
logger.info("Task plan message created successfully") logger.info("Task plan message created successfully")
except Exception as e: except Exception as e:
logger.error(f"Error creating task plan message: {str(e)}") logger.error(f"Error creating task plan message: {str(e)}")
@ -101,7 +101,7 @@ class MessageCreator:
if taskStep.userMessage: if taskStep.userMessage:
taskStartMessage["message"] += f"\n\n💬 {taskStep.userMessage}" taskStartMessage["message"] += f"\n\n💬 {taskStep.userMessage}"
message = self.services.workflow.storeMessageWithDocuments(workflow, taskStartMessage, []) self.services.workflow.storeMessageWithDocuments(workflow, taskStartMessage, [])
logger.info(f"Task start message created for task {taskIndex}") logger.info(f"Task start message created for task {taskIndex}")
except Exception as e: except Exception as e:
logger.error(f"Error creating task start message: {str(e)}") logger.error(f"Error creating task start message: {str(e)}")
@ -182,12 +182,10 @@ class MessageCreator:
logger.info(f"Creating ERROR message: {messageText}") logger.info(f"Creating ERROR message: {messageText}")
logger.info(f"Message data: {messageData}") logger.info(f"Message data: {messageData}")
message = self.services.workflow.storeMessageWithDocuments(workflow, messageData, createdDocuments) self.services.workflow.storeMessageWithDocuments(workflow, messageData, createdDocuments)
logger.info(f"Message created: {action.execMethod}.{action.execAction}") logger.info(f"Message created: {action.execMethod}.{action.execAction}")
return message
except Exception as e: except Exception as e:
logger.error(f"Error creating action message: {str(e)}") logger.error(f"Error creating action message: {str(e)}")
return None
async def createTaskCompletionMessage(self, taskStep: TaskStep, workflow: ChatWorkflow, taskIndex: int, async def createTaskCompletionMessage(self, taskStep: TaskStep, workflow: ChatWorkflow, taskIndex: int,
totalTasks: int, reviewResult: ReviewResult): totalTasks: int, reviewResult: ReviewResult):
@ -227,7 +225,7 @@ class MessageCreator:
"taskProgress": "success" "taskProgress": "success"
} }
message = self.services.workflow.storeMessageWithDocuments(workflow, taskCompletionMessage, []) self.services.workflow.storeMessageWithDocuments(workflow, taskCompletionMessage, [])
logger.info(f"Task completion message created for task {taskIndex}") logger.info(f"Task completion message created for task {taskIndex}")
except Exception as e: except Exception as e:
logger.error(f"Error creating task completion message: {str(e)}") logger.error(f"Error creating task completion message: {str(e)}")
@ -254,7 +252,7 @@ class MessageCreator:
"taskProgress": "retry" "taskProgress": "retry"
} }
message = self.services.workflow.storeMessageWithDocuments(workflow, retryMessage, []) self.services.workflow.storeMessageWithDocuments(workflow, retryMessage, [])
logger.info(f"Retry message created for task {taskIndex}") logger.info(f"Retry message created for task {taskIndex}")
except Exception as e: except Exception as e:
logger.error(f"Error creating retry message: {str(e)}") logger.error(f"Error creating retry message: {str(e)}")
@ -293,7 +291,7 @@ class MessageCreator:
"taskProgress": "fail" "taskProgress": "fail"
} }
message = self.services.workflow.storeMessageWithDocuments(workflow, messageData, []) self.services.workflow.storeMessageWithDocuments(workflow, messageData, [])
logger.info(f"Error message created for task {taskIndex}") logger.info(f"Error message created for task {taskIndex}")
except Exception as e: except Exception as e:
logger.error(f"Error creating error message: {str(e)}") logger.error(f"Error creating error message: {str(e)}")

View file

@ -147,12 +147,6 @@ class ReactMode(BaseMode):
context.improvements = [] context.improvements = []
context.improvements.append(f"Step {step}: {decision.get('reason')}") context.improvements.append(f"Step {step}: {decision.get('reason')}")
# Telemetry: simple duration per step
duration = time.time() - t0
self.services.workflow.storeLog(workflow, {
"message": f"react_step_duration_sec={duration:.3f}",
"type": "info"
})
lastReviewDict = decision lastReviewDict = decision
# Create user-friendly message AFTER action execution # Create user-friendly message AFTER action execution

View file

@ -142,10 +142,10 @@ class WorkflowManager:
self.services.rawUserPrompt = userInput.prompt self.services.rawUserPrompt = userInput.prompt
self.services.currentUserPrompt = userInput.prompt self.services.currentUserPrompt = userInput.prompt
self.workflowProcessor = WorkflowProcessor(self.services, workflow) self.workflowProcessor = WorkflowProcessor(self.services, workflow)
message = await self._sendFirstMessage(userInput, workflow) await self._sendFirstMessage(userInput, workflow)
task_plan = await self._planTasks(userInput, workflow) task_plan = await self._planTasks(userInput, workflow)
await self._executeTasks(task_plan, workflow) await self._executeTasks(task_plan, workflow)
await self._processWorkflowResults(workflow, message) await self._processWorkflowResults(workflow)
except WorkflowStoppedException: except WorkflowStoppedException:
self._handleWorkflowStop(workflow) self._handleWorkflowStop(workflow)
@ -155,7 +155,7 @@ class WorkflowManager:
# Helper functions # Helper functions
async def _sendFirstMessage(self, userInput: UserInputRequest, workflow: ChatWorkflow) -> ChatMessage: async def _sendFirstMessage(self, userInput: UserInputRequest, workflow: ChatWorkflow) -> None:
"""Send first message to start workflow""" """Send first message to start workflow"""
try: try:
self.workflowProcessor._checkWorkflowStopped(workflow) self.workflowProcessor._checkWorkflowStopped(workflow)
@ -270,7 +270,7 @@ class WorkflowManager:
except Exception: except Exception:
pass pass
# Create documents for context items (in-memory ChatDocument; persistence via storeMessageWithDocuments) # Create documents for context items
if contextItems and isinstance(contextItems, list): if contextItems and isinstance(contextItems, list):
for idx, item in enumerate(contextItems): for idx, item in enumerate(contextItems):
try: try:
@ -317,8 +317,7 @@ class WorkflowManager:
logger.warning(f"Failed to process user fileIds: {e}") logger.warning(f"Failed to process user fileIds: {e}")
# Finally, persist and bind the first message with combined documents (context + user) # Finally, persist and bind the first message with combined documents (context + user)
created_message = self.services.workflow.storeMessageWithDocuments(workflow, messageData, created_docs) self.services.workflow.storeMessageWithDocuments(workflow, messageData, created_docs)
return created_message
except Exception as e: except Exception as e:
logger.error(f"Error sending first message: {str(e)}") logger.error(f"Error sending first message: {str(e)}")
@ -385,7 +384,7 @@ class WorkflowManager:
workflow.status = "completed" workflow.status = "completed"
return None return None
async def _processWorkflowResults(self, workflow: ChatWorkflow, initial_message: ChatMessage) -> None: async def _processWorkflowResults(self, workflow: ChatWorkflow) -> None:
"""Process workflow results based on workflow status and create appropriate messages""" """Process workflow results based on workflow status and create appropriate messages"""
try: try:
try: try:
@ -567,9 +566,7 @@ class WorkflowManager:
} }
# Create message using interface # Create message using interface
message = self.services.workflow.storeMessageWithDocuments(workflow, messageData, []) self.services.workflow.storeMessageWithDocuments(workflow, messageData, [])
if message:
self.services.workflow.storeMessageWithDocuments(workflow, message.__dict__, getattr(message, 'documents', []))
# Update workflow status to completed # Update workflow status to completed
workflow.status = "completed" workflow.status = "completed"