From eebd995d64d2d9fb374c3fa2d660e4c4ef6fe43e Mon Sep 17 00:00:00 2001
From: ValueOn AG
Date: Thu, 12 Jun 2025 17:52:51 +0200
Subject: [PATCH] structuring action logic
---
modules/interfaces/serviceChatModel.py | 248 +++++++----------
modules/workflow/managerChat.py | 361 +++++++++++++++++++------
modules/workflow/serviceContainer.py | 162 ++++++++++-
notes/changelog.txt | 43 ++-
4 files changed, 545 insertions(+), 269 deletions(-)
diff --git a/modules/interfaces/serviceChatModel.py b/modules/interfaces/serviceChatModel.py
index f81a882b..b7d2a488 100644
--- a/modules/interfaces/serviceChatModel.py
+++ b/modules/interfaces/serviceChatModel.py
@@ -42,7 +42,6 @@ class TaskStatus(str, Enum):
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
- ROLLED_BACK = "rolled_back"
# Register labels for TaskStatus
register_model_labels(
@@ -164,204 +163,131 @@ register_model_labels(
class TaskAction(BaseModel, ModelMixin):
"""Model for task actions"""
- id: str = Field(default_factory=lambda: str(uuid.uuid4()), description="Unique action identifier")
- method: str = Field(..., description="Method to execute")
- action: str = Field(..., description="Action to perform")
- parameters: Dict[str, Any] = Field(default_factory=dict, description="Action parameters")
- status: TaskStatus = Field(default=TaskStatus.PENDING, description="Current action status")
- retryCount: int = Field(default=0, description="Number of retry attempts")
- retryMax: int = Field(default=3, description="Maximum number of retry attempts")
+ id: str = Field(..., description="Action ID")
+ execMethod: str = Field(..., description="Method to execute")
+ execAction: str = Field(..., description="Action to perform")
+ execParameters: Dict[str, Any] = Field(default_factory=dict, description="Action parameters")
+ execResultLabel: Optional[str] = Field(None, description="Label for the set of result documents")
+ status: TaskStatus = Field(default=TaskStatus.PENDING, description="Action status")
error: Optional[str] = Field(None, description="Error message if action failed")
- startedAt: Optional[datetime] = Field(None, description="Action start timestamp")
- finishedAt: Optional[datetime] = Field(None, description="Action completion timestamp")
+ retryCount: int = Field(default=0, description="Number of retries attempted")
+ retryMax: int = Field(default=3, description="Maximum number of retries")
+ processingTime: Optional[float] = Field(None, description="Processing time in seconds")
+ timestamp: datetime = Field(default_factory=lambda: datetime.now(UTC), description="When the action was executed")
- def start(self) -> None:
- """Start the action"""
- self.status = TaskStatus.RUNNING
- self.startedAt = datetime.now(UTC)
+ def isSuccessful(self) -> bool:
+ """Check if action was successful"""
+ return self.status == TaskStatus.COMPLETED
- def complete(self) -> None:
- """Mark action as completed"""
- self.status = TaskStatus.COMPLETED
- self.finishedAt = datetime.now(UTC)
+ def hasError(self) -> bool:
+ """Check if action has an error"""
+ return self.status == TaskStatus.FAILED
- def fail(self, error: str) -> None:
- """Mark action as failed"""
- self.status = TaskStatus.FAILED
+ def getErrorMessage(self) -> Optional[str]:
+ """Get error message if any"""
+ return self.error if self.hasError() else None
+
+ def setError(self, error: str) -> None:
+ """Set action error"""
self.error = error
- self.finishedAt = datetime.now(UTC)
+ self.status = TaskStatus.FAILED
- def canRetry(self) -> bool:
- """Check if action can be retried"""
- return self.retryCount < self.retryMax
-
- def incrementRetry(self) -> None:
- """Increment retry count"""
- self.retryCount += 1
+ def setSuccess(self) -> None:
+ """Set action as successful"""
+ self.status = TaskStatus.COMPLETED
+ self.error = None
# Register labels for TaskAction
register_model_labels(
"TaskAction",
{"en": "Task Action", "fr": "Action de tâche"},
{
- "id": {"en": "ID", "fr": "ID"},
- "method": {"en": "Method", "fr": "Méthode"},
- "action": {"en": "Action", "fr": "Action"},
- "parameters": {"en": "Parameters", "fr": "Paramètres"},
+ "id": {"en": "Action ID", "fr": "ID de l'action"},
+ "execMethod": {"en": "Method", "fr": "Méthode"},
+ "execAction": {"en": "Action", "fr": "Action"},
+ "execParameters": {"en": "Parameters", "fr": "Paramètres"},
"status": {"en": "Status", "fr": "Statut"},
- "retryCount": {"en": "Retry Count", "fr": "Nombre de tentatives"},
- "retryMax": {"en": "Max Retries", "fr": "Tentatives maximales"},
"error": {"en": "Error", "fr": "Erreur"},
- "startedAt": {"en": "Started At", "fr": "Démarré le"},
- "finishedAt": {"en": "Finished At", "fr": "Terminé le"}
+ "retryCount": {"en": "Retry Count", "fr": "Nombre de tentatives"},
+ "retryMax": {"en": "Max Retries", "fr": "Tentatives max"},
+ "resultDocuments": {"en": "Result Documents", "fr": "Documents du résultat"},
+ "execResultLabel": {"en": "Document Label", "fr": "Label du document"},
+ "processingTime": {"en": "Processing Time", "fr": "Temps de traitement"},
+ "timestamp": {"en": "Timestamp", "fr": "Horodatage"}
}
)
class TaskItem(BaseModel, ModelMixin):
- """Model for tasks"""
- id: str = Field(..., description="Unique task identifier")
- workflowId: str = Field(..., description="Associated workflow ID")
- status: TaskStatus = Field(default=TaskStatus.PENDING, description="Current task status")
+ """Model for workflow tasks"""
+ id: str = Field(..., description="Task ID")
+ workflowId: str = Field(..., description="Workflow ID")
+ userInput: str = Field(..., description="User input that triggered the task")
+ status: TaskStatus = Field(default=TaskStatus.PENDING, description="Task status")
error: Optional[str] = Field(None, description="Error message if task failed")
- startedAt: Optional[datetime] = Field(None, description="Task start timestamp")
- finishedAt: Optional[datetime] = Field(None, description="Task completion timestamp")
+ startedAt: Optional[str] = Field(None, description="When the task started")
+ finishedAt: Optional[str] = Field(None, description="When the task finished")
actionList: List[TaskAction] = Field(default_factory=list, description="List of actions to execute")
- documentsOutput: List[Dict[str, Any]] = Field(default_factory=list, description="Output documents")
- retryCount: int = Field(default=0, description="Number of retry attempts")
- retryMax: int = Field(default=3, description="Maximum number of retry attempts")
+ retryCount: int = Field(default=0, description="Number of retries attempted")
+ retryMax: int = Field(default=3, description="Maximum number of retries")
rollbackOnFailure: bool = Field(default=True, description="Whether to rollback on failure")
- dependencies: List[str] = Field(default_factory=list, description="List of dependent task IDs")
- feedback: Optional[Dict[str, Any]] = Field(None, description="Task feedback data")
+ dependencies: List[str] = Field(default_factory=list, description="List of task IDs this task depends on")
+ feedback: Optional[str] = Field(None, description="Task feedback message")
+ processingTime: Optional[float] = Field(None, description="Total processing time in seconds")
+ resultLabels: Optional[Dict[str, Any]] = Field(default_factory=dict, description="Map of result labels to their values")
- def isCompleted(self) -> bool:
- """Check if task is completed"""
+ def isSuccessful(self) -> bool:
+ """Check if task was successful"""
return self.status == TaskStatus.COMPLETED
- def isFailed(self) -> bool:
- """Check if task has failed"""
+ def hasError(self) -> bool:
+ """Check if task has an error"""
return self.status == TaskStatus.FAILED
- def canRetry(self) -> bool:
- """Check if task can be retried"""
- return self.retryCount < self.retryMax
+ def getErrorMessage(self) -> Optional[str]:
+ """Get error message if any"""
+ return self.error if self.hasError() else None
- def start(self) -> None:
- """Start the task"""
- self.status = TaskStatus.RUNNING
- self.startedAt = datetime.now(UTC)
+ def getResultDocuments(self) -> List[ChatDocument]:
+ """Get all documents from all successful actions"""
+ documents = []
+ for action in self.actionList:
+ if action.isSuccessful() and action.resultDocuments:
+ documents.extend(action.resultDocuments)
+ return documents
- def complete(self) -> None:
- """Mark task as completed"""
- self.status = TaskStatus.COMPLETED
- self.finishedAt = datetime.now(UTC)
+ def getResultDocumentLabel(self) -> Optional[str]:
+ """Get the label for the result documents"""
+ for action in self.actionList:
+ if action.isSuccessful() and action.execResultLabel:
+ return action.execResultLabel
+ return None
- def fail(self, error: str) -> None:
- """Mark task as failed"""
- self.status = TaskStatus.FAILED
- self.error = error
- self.finishedAt = datetime.now(UTC)
-
- def cancel(self) -> None:
- """Cancel the task"""
- self.status = TaskStatus.CANCELLED
- self.finishedAt = datetime.now(UTC)
-
- def rollback(self) -> None:
- """Mark task as rolled back"""
- self.status = TaskStatus.ROLLED_BACK
- self.finishedAt = datetime.now(UTC)
-
- def incrementRetry(self) -> None:
- """Increment retry count"""
- self.retryCount += 1
-
- def addDependency(self, taskId: str) -> None:
- """Add a task dependency"""
- if taskId not in self.dependencies:
- self.dependencies.append(taskId)
-
- def removeDependency(self, taskId: str) -> None:
- """Remove a task dependency"""
- if taskId in self.dependencies:
- self.dependencies.remove(taskId)
-
- def addAction(self, action: TaskAction) -> None:
- """Add an action to the task"""
- self.actionList.append(action)
-
- def addDocumentOutput(self, document: Dict[str, Any]) -> None:
- """Add an output document"""
- self.documentsOutput.append(document)
-
- def setFeedback(self, feedback: Dict[str, Any]) -> None:
- """Set task feedback"""
- self.feedback = feedback
+ def getResultLabel(self, label: str) -> Optional[Any]:
+ """Get value for a specific result label"""
+ return self.resultLabels.get(label) if self.resultLabels else None
# Register labels for TaskItem
register_model_labels(
"TaskItem",
{"en": "Task", "fr": "Tâche"},
{
- "id": {"en": "ID", "fr": "ID"},
- "workflowId": {"en": "Workflow ID", "fr": "ID du flux de travail"},
+ "id": {"en": "Task ID", "fr": "ID de la tâche"},
+ "workflowId": {"en": "Workflow ID", "fr": "ID du workflow"},
+ "userInput": {"en": "User Input", "fr": "Entrée utilisateur"},
"status": {"en": "Status", "fr": "Statut"},
"error": {"en": "Error", "fr": "Erreur"},
- "startedAt": {"en": "Started At", "fr": "Démarré le"},
- "finishedAt": {"en": "Finished At", "fr": "Terminé le"},
- "actionList": {"en": "Action List", "fr": "Liste d'actions"},
- "documentsOutput": {"en": "Output Documents", "fr": "Documents de sortie"},
+ "startedAt": {"en": "Started At", "fr": "Démarré à"},
+ "finishedAt": {"en": "Finished At", "fr": "Terminé à"},
+ "actionList": {"en": "Actions", "fr": "Actions"},
"retryCount": {"en": "Retry Count", "fr": "Nombre de tentatives"},
- "retryMax": {"en": "Max Retries", "fr": "Tentatives maximales"},
- "rollbackOnFailure": {"en": "Rollback on Failure", "fr": "Annulation en cas d'échec"},
+ "retryMax": {"en": "Max Retries", "fr": "Tentatives max"},
+ "rollbackOnFailure": {"en": "Rollback On Failure", "fr": "Annuler en cas d'échec"},
"dependencies": {"en": "Dependencies", "fr": "Dépendances"},
- "feedback": {"en": "Task Feedback", "fr": "Retour sur la tâche"}
- }
-)
-
-class TaskResult(BaseModel, ModelMixin):
- """Model for task execution results"""
- taskId: str = Field(..., description="ID of the task this result belongs to")
- status: TaskStatus = Field(..., description="Result status")
- success: bool = Field(..., description="Whether the task was successful")
- error: Optional[str] = Field(None, description="Error message if task failed")
- data: Optional[Dict[str, Any]] = Field(None, description="Result data")
- documents: List[ChatDocument] = Field(default_factory=list, description="Output documents")
- documentsLabel: Optional[str] = Field(None, description="Label for the set of documents")
- feedback: Optional[str] = Field(None, description="Task feedback message")
- processingTime: Optional[float] = Field(None, description="Processing time in seconds")
- timestamp: datetime = Field(default_factory=lambda: datetime.now(UTC), description="When the result was created")
-
- def isSuccessful(self) -> bool:
- """Check if result indicates success"""
- return self.success and self.status == TaskStatus.COMPLETED
-
- def hasError(self) -> bool:
- """Check if result has an error"""
- return not self.success or self.status == TaskStatus.FAILED
-
- def getErrorMessage(self) -> Optional[str]:
- """Get error message if any"""
- return self.error if self.hasError() else None
-
-# Register labels for TaskResult
-register_model_labels(
- "TaskResult",
- {"en": "Task Result", "fr": "Résultat de la tâche"},
- {
- "taskId": {"en": "Task ID", "fr": "ID de la tâche"},
- "status": {"en": "Status", "fr": "Statut"},
- "success": {"en": "Success", "fr": "Succès"},
- "error": {"en": "Error", "fr": "Erreur"},
- "data": {"en": "Data", "fr": "Données"},
- "documents": {"en": "Documents", "fr": "Documents"},
"feedback": {"en": "Feedback", "fr": "Retour"},
- "processingTime": {"en": "Processing Time", "fr": "Temps de traitement"},
- "timestamp": {"en": "Timestamp", "fr": "Horodatage"}
+ "processingTime": {"en": "Processing Time", "fr": "Temps de traitement"}
}
)
-# ===== Message and Workflow Models =====
-
class ChatStat(BaseModel, ModelMixin):
"""Data model for chat statistics"""
id: str = Field(default_factory=lambda: str(uuid.uuid4()), description="Primary key")
@@ -420,6 +346,7 @@ class ChatMessage(BaseModel, ModelMixin):
workflowId: str = Field(description="Foreign key to workflow")
parentMessageId: Optional[str] = Field(None, description="Parent message ID for threading")
documents: List[ChatDocument] = Field(default_factory=list, description="Associated documents")
+ documentsLabel: Optional[str] = Field(None, description="Label for the set of documents")
message: Optional[str] = Field(None, description="Message content")
role: str = Field(description="Role of the message sender")
status: str = Field(description="Status of the message (first, step, last)")
@@ -427,6 +354,9 @@ class ChatMessage(BaseModel, ModelMixin):
publishedAt: str = Field(description="When the message was published")
stats: Optional[ChatStat] = Field(None, description="Statistics for this message")
success: Optional[bool] = Field(None, description="Whether the message processing was successful")
+ actionId: Optional[str] = Field(None, description="ID of the action that produced this message")
+ actionMethod: Optional[str] = Field(None, description="Method of the action that produced this message")
+ actionName: Optional[str] = Field(None, description="Name of the action that produced this message")
# Register labels for ChatMessage
register_model_labels(
@@ -437,13 +367,17 @@ register_model_labels(
"workflowId": {"en": "Workflow ID", "fr": "ID du flux de travail"},
"parentMessageId": {"en": "Parent Message ID", "fr": "ID du message parent"},
"documents": {"en": "Documents", "fr": "Documents"},
+ "documentsLabel": {"en": "Documents Label", "fr": "Label des documents"},
"message": {"en": "Message", "fr": "Message"},
"role": {"en": "Role", "fr": "Rôle"},
"status": {"en": "Status", "fr": "Statut"},
"sequenceNr": {"en": "Sequence Number", "fr": "Numéro de séquence"},
"publishedAt": {"en": "Published At", "fr": "Publié le"},
"stats": {"en": "Statistics", "fr": "Statistiques"},
- "success": {"en": "Success", "fr": "Succès"}
+ "success": {"en": "Success", "fr": "Succès"},
+ "actionId": {"en": "Action ID", "fr": "ID de l'action"},
+ "actionMethod": {"en": "Action Method", "fr": "Méthode de l'action"},
+ "actionName": {"en": "Action Name", "fr": "Nom de l'action"}
}
)
diff --git a/modules/workflow/managerChat.py b/modules/workflow/managerChat.py
index 510e6668..a605bc91 100644
--- a/modules/workflow/managerChat.py
+++ b/modules/workflow/managerChat.py
@@ -3,6 +3,7 @@ from typing import Dict, Any, Optional, List, Union
from datetime import datetime, UTC
import json
import uuid
+import time
from modules.interfaces.serviceAppModel import User
from modules.interfaces.serviceChatModel import (
@@ -50,7 +51,10 @@ class ChatManager:
}
# Create task using ChatInterface
- return self.service.createTask(taskData)
+ task = self.service.createTask(taskData)
+ if task:
+ self.service.currentTask = task
+ return task
except Exception as e:
logger.error(f"Error creating initial task: {str(e)}")
@@ -81,7 +85,10 @@ class ChatManager:
}
# Create task using ChatInterface
- return self.service.createTask(taskData)
+ task = self.service.createTask(taskData)
+ if task:
+ self.service.currentTask = task
+ return task
except Exception as e:
logger.error(f"Error creating next task: {str(e)}")
@@ -221,111 +228,215 @@ Document: {document.filename} ({document.mimeType})
return ""
# ===== Task Execution and Processing =====
- async def executeTask(self, task: TaskItem) -> TaskResult:
- """Execute a task and return its result"""
+ async def executeTask(self, task: TaskItem) -> TaskItem:
+ """Execute a task with its list of actions"""
try:
- # Create result object
- result = TaskResult(
- taskId=task.id,
- status=task.status,
- success=True,
- timestamp=datetime.now(UTC)
- )
-
# Start timing
- startTime = datetime.now(UTC)
-
- # Execute each action
+ start_time = time.time()
+ task.startedAt = datetime.now(UTC).isoformat()
+ task.status = TaskStatus.RUNNING
+
+ # Execute each action in sequence
for action in task.actionList:
try:
# Execute action
- actionResult = await action.execute()
-
- # Update action status
- action.status = actionResult.status
- if actionResult.error:
- action.error = actionResult.error
-
- except Exception as e:
- logger.error(f"Action execution error: {str(e)}")
- action.status = TaskStatus.FAILED
- action.error = str(e)
-
- # Calculate processing time
- endTime = datetime.now(UTC)
- result.processingTime = (endTime - startTime).total_seconds()
-
- # Update task status
- if all(action.status == TaskStatus.COMPLETED for action in task.actionList):
- result.status = TaskStatus.COMPLETED
- result.success = True
- else:
- result.status = TaskStatus.FAILED
- result.success = False
- result.error = "One or more actions failed"
-
- # Generate feedback and documents if task completed successfully
- if result.status == TaskStatus.COMPLETED:
- # Generate feedback using AI
- result.feedback = await self._processTaskResults(task)
-
- # Create output documents
- result.documents = await self._createOutputDocuments(task)
- # Set documents label based on task input
- result.documentsLabel = "TaskResult"
- else:
- result.feedback = f"Task failed: {result.error}"
-
- # Update task in database
- self.service.updateTask(task.id, {
- "status": result.status,
- "error": result.error,
- "finishedAt": datetime.now(UTC).isoformat(),
- "actionList": [action.dict() for action in task.actionList],
- "documentsOutput": result.documents,
- "feedback": result.feedback,
- "documentsLabel": result.documentsLabel
- })
-
- return result
-
- except Exception as e:
- logger.error(f"Task execution error: {str(e)}")
- raise
+ action_start = time.time()
+ result = await self._executeAction(action)
+ action.processingTime = time.time() - action_start
- async def parseTaskResult(self, workflow: ChatWorkflow, result: TaskResult) -> None:
+ # Validate result
+ if not result.success:
+ error_msg = result.error or "Unknown error"
+ action.setError(error_msg)
+
+ # Create error message
+ error_message = ChatMessage(
+ workflowId=task.workflowId,
+ message=f"{action.execMethod}.{action.execAction}.error: {error_msg}",
+ role="system",
+ status="step",
+ sequenceNr=0, # Will be set by workflow
+ publishedAt=datetime.now(UTC).isoformat(),
+ success=False,
+ actionId=action.id,
+ actionMethod=action.execMethod,
+ actionName=action.execAction
+ )
+
+ # Add error message to workflow
+ await self._addMessageToWorkflow(task.workflowId, error_message)
+
+ # If action failed and we have retries left, retry
+ if action.retryCount < action.retryMax:
+ action.retryCount += 1
+ continue
+
+ # If we're out of retries, fail the task
+ task.error = f"Action {action.id} failed after {action.retryCount} retries: {error_msg}"
+ task.status = TaskStatus.FAILED
+ return task
+
+ # Process successful result
+ action.setSuccess()
+
+ # Set result label from AI response if provided
+ if result.data.get("resultLabel"):
+ action.execResultLabel = result.data["resultLabel"]
+
+ # Create result message with documents if any
+ if result.data.get("documents"):
+ # Store AI-generated documents in database and create ChatDocuments
+ documents = []
+ for doc in result.data["documents"]:
+ # Create document (which also creates the file)
+ document = await self.service.createDocument(
+ fileName=doc["filename"],
+ mimeType=doc["mimeType"],
+ content=doc["content"],
+ base64encoded=doc["base64Encoded"]
+ )
+ documents.append(document)
+
+ # Create success message with documents
+ success_message = ChatMessage(
+ workflowId=task.workflowId,
+ message=f"{action.execMethod}.{action.execAction}",
+ role="system",
+ status="step",
+ sequenceNr=0, # Will be set by workflow
+ publishedAt=datetime.now(UTC).isoformat(),
+ documents=documents,
+ documentsLabel=action.execResultLabel, # Use the label from action
+ success=True,
+ actionId=action.id,
+ actionMethod=action.execMethod,
+ actionName=action.execAction
+ )
+
+ # Add success message to workflow
+ await self._addMessageToWorkflow(task.workflowId, success_message)
+
+ # Store result labels
+ if result.data.get("labels"):
+ task.resultLabels.update(result.data["labels"])
+
+ except Exception as e:
+ error_msg = str(e)
+ action.setError(error_msg)
+
+ # Create error message
+ error_message = ChatMessage(
+ workflowId=task.workflowId,
+ message=f"{action.execMethod}.{action.execAction}.error: {error_msg}",
+ role="system",
+ status="step",
+ sequenceNr=0, # Will be set by workflow
+ publishedAt=datetime.now(UTC).isoformat(),
+ success=False,
+ actionId=action.id,
+ actionMethod=action.execMethod,
+ actionName=action.execAction
+ )
+
+ # Add error message to workflow
+ await self._addMessageToWorkflow(task.workflowId, error_message)
+
+ # If action failed and we have retries left, retry
+ if action.retryCount < action.retryMax:
+ action.retryCount += 1
+ continue
+
+ # If we're out of retries, fail the task
+ task.error = f"Action {action.id} failed after {action.retryCount} retries: {error_msg}"
+ task.status = TaskStatus.FAILED
+ return task
+
+ # Check if all actions were successful
+ if all(action.isSuccessful() for action in task.actionList):
+ task.status = TaskStatus.COMPLETED
+ task.feedback = "Task completed successfully"
+
+ # Create chat message with results
+ message = ChatMessage(
+ workflowId=task.workflowId,
+ message=task.feedback,
+ role="system",
+ status="last",
+ sequenceNr=0, # Will be set by workflow
+ publishedAt=datetime.now(UTC).isoformat(),
+ success=True
+ )
+
+ # Add message to workflow
+ await self._addMessageToWorkflow(task.workflowId, message)
+
+ else:
+ # If any action failed, fail the task
+ task.status = TaskStatus.FAILED
+ task.error = "One or more actions failed"
+
+ # Create error message
+ error_message = ChatMessage(
+ workflowId=task.workflowId,
+ message=f"Task failed: {task.getErrorMessage()}",
+ role="system",
+ status="last",
+ sequenceNr=0, # Will be set by workflow
+ publishedAt=datetime.now(UTC).isoformat(),
+ success=False
+ )
+
+ # Add error message to workflow
+ await self._addMessageToWorkflow(task.workflowId, error_message)
+
+ # Calculate processing time
+ task.processingTime = time.time() - start_time
+ task.finishedAt = datetime.now(UTC).isoformat()
+
+ return task
+
+ except Exception as e:
+ # Handle unexpected errors
+ task.status = TaskStatus.FAILED
+ task.error = str(e)
+ task.finishedAt = datetime.now(UTC).isoformat()
+
+ # Create error message
+ error_message = ChatMessage(
+ workflowId=task.workflowId,
+ message=f"Task failed with unexpected error: {task.getErrorMessage()}",
+ role="system",
+ status="last",
+ sequenceNr=0, # Will be set by workflow
+ publishedAt=datetime.now(UTC).isoformat(),
+ success=False
+ )
+
+ # Add error message to workflow
+ await self._addMessageToWorkflow(task.workflowId, error_message)
+
+ return task
+
+ async def parseTaskResult(self, workflow: ChatWorkflow, task: TaskItem) -> None:
"""Process and store task result in workflow"""
try:
- # Find task in workflow
- task = self.service.getTask(result.taskId)
- if not task:
- logger.error(f"Task {result.taskId} not found in workflow")
- return
-
- # Update task status
- self.service.updateTask(task.id, {
- "status": result.status,
- "error": result.error,
- "finishedAt": datetime.now(UTC).isoformat()
- })
-
# Create feedback message if available
- if result.feedback:
+ if task.feedback:
message = ChatMessage(
id=str(uuid.uuid4()),
workflowId=workflow.id,
role="assistant",
- message=result.feedback,
+ message=task.feedback,
status="step",
- documents=result.documents
+ documents=task.getResultDocuments()
)
self.service.createWorkflowMessage(message.dict())
# Update workflow stats
- if result.processingTime:
+ if task.processingTime:
if not workflow.stats:
workflow.stats = ChatStat()
- workflow.stats.processingTime = (workflow.stats.processingTime or 0) + result.processingTime
+ workflow.stats.processingTime = (workflow.stats.processingTime or 0) + task.processingTime
self.service.updateWorkflow(workflow.id, {"stats": workflow.stats.dict()})
except Exception as e:
@@ -593,4 +704,78 @@ Rules:
4. Include file extensions in filenames
Return a JSON array of Document objects.
-"""
\ No newline at end of file
+"""
+
+ def _createTaskDefinitionPrompt(self, userInput: str, workflow: ChatWorkflow) -> str:
+ """Create prompt for task definition"""
+ # Get available methods
+ methodList = self.service.getMethodsList()
+
+ # Get workflow history
+ messageSummary = self.service.getMessageSummary(workflow.messages[-1] if workflow.messages else None)
+
+ # Get available documents and connections
+ docRefs = self.service.getDocumentReferenceList()
+ connRefs = self.service.getConnectionReferenceList()
+
+ prompt = f"""
+Task Definition for: {userInput}
+
+Available Methods:
+{chr(10).join(f"- {method}" for method in methodList)}
+
+Workflow History:
+{chr(10).join(f"- {msg['message']}" for msg in messageSummary.get('chat', []))}
+
+Available Documents:
+{chr(10).join(f"- {doc['documentReference']} ({doc['datetime']})" for doc in docRefs.get('chat', []))}
+
+Available Connections:
+{chr(10).join(f"- {conn}" for conn in connRefs)}
+
+Instructions:
+1. Result Format (JSON):
+{{
+ "status": "pending|running|completed|failed",
+ "feedback": "string explaining what was done and what needs to be done next",
+ "actions": [
+ {{
+ "method": "string",
+ "action": "string",
+ "parameters": {{
+ "param1": "value1",
+ "param2": "value2"
+ }},
+ "resultLabel": "documentList__