From f86b3a9e2e33e9cc255f588254f3fcc8827543cd Mon Sep 17 00:00:00 2001
From: ValueOn AG
Date: Sat, 21 Jun 2025 03:06:00 +0200
Subject: [PATCH] task planner works
---
config.ini | 2 +-
modules/interfaces/interfaceChatObjects.py | 167 ++++++++--
modules/methods/methodCoder.py | 2 +-
modules/methods/methodDocument.py | 2 +-
modules/methods/methodExcel.py | 25 +-
modules/methods/methodOperator.py | 2 +-
modules/methods/methodOutlook.py | 25 +-
modules/methods/methodPowerpoint.py | 25 +-
modules/methods/methodSharepoint.py | 25 +-
modules/methods/methodWeb.py | 2 +-
modules/workflow/managerChat.py | 328 +++++++++++++-------
modules/workflow/managerWorkflow.py | 3 +-
modules/{methods => workflow}/methodBase.py | 0
modules/workflow/serviceContainer.py | 52 ++--
test_workflow.py | 180 ++++++++---
15 files changed, 589 insertions(+), 251 deletions(-)
rename modules/{methods => workflow}/methodBase.py (100%)
diff --git a/config.ini b/config.ini
index dddd934f..799d31ba 100644
--- a/config.ini
+++ b/config.ini
@@ -15,7 +15,7 @@ Connector_AiOpenai_MAX_TOKENS = 2000
# Anthropic configuration
Connector_AiAnthropic_API_URL = https://api.anthropic.com/v1/messages
Connector_AiAnthropic_API_SECRET = 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
-Connector_AiAnthropic_MODEL_NAME = claude-3-opus-20240229
+Connector_AiAnthropic_MODEL_NAME = claude-3-5-sonnet-20241022
Connector_AiAnthropic_TEMPERATURE = 0.2
Connector_AiAnthropic_MAX_TOKENS = 2000
diff --git a/modules/interfaces/interfaceChatObjects.py b/modules/interfaces/interfaceChatObjects.py
index d939c394..a871e55f 100644
--- a/modules/interfaces/interfaceChatObjects.py
+++ b/modules/interfaces/interfaceChatObjects.py
@@ -9,12 +9,11 @@ import uuid
from datetime import datetime
from typing import Dict, Any, List, Optional, Union
-import hashlib
import asyncio
from modules.interfaces.interfaceChatAccess import ChatAccess
from modules.interfaces.interfaceChatModel import (
- TaskStatus, UserInputRequest, ChatDocument, TaskItem, ChatStat, ChatLog, ChatMessage, ChatWorkflow, TaskAction
+ TaskStatus, UserInputRequest, ChatDocument, TaskItem, ChatStat, ChatLog, ChatMessage, ChatWorkflow, TaskAction, TaskResult, ActionResult
)
from modules.interfaces.interfaceAppModel import User
@@ -169,7 +168,7 @@ class ChatObjects:
logs=[ChatLog(**log) for log in workflow.get("logs", [])],
messages=[ChatMessage(**msg) for msg in workflow.get("messages", [])],
stats=ChatStat(**workflow.get("dataStats", {})) if workflow.get("dataStats") else None,
- mandateId=workflow.get("mandateId", self.currentUser.get("mandateId"))
+ mandateId=workflow.get("mandateId", self.currentUser.mandateId)
)
except Exception as e:
logger.error(f"Error validating workflow data: {str(e)}")
@@ -202,7 +201,7 @@ class ChatObjects:
logs=[],
messages=[],
stats=ChatStat(**created.get("dataStats", {})) if created.get("dataStats") else None,
- mandateId=created.get("mandateId", self.currentUser.get("mandateId"))
+ mandateId=created.get("mandateId", self.currentUser.mandateId)
)
def updateWorkflow(self, workflowId: str, workflowData: Dict[str, Any]) -> ChatWorkflow:
@@ -264,6 +263,9 @@ class ChatObjects:
def createWorkflowMessage(self, messageData: Dict[str, Any]) -> ChatMessage:
"""Creates a message for a workflow if user has access."""
try:
+ # Ensure ID is present
+ if "id" not in messageData or not messageData["id"]:
+ messageData["id"] = f"msg_{uuid.uuid4()}"
# Check required fields
requiredFields = ["id", "workflowId"]
for field in requiredFields:
@@ -299,16 +301,6 @@ class ChatObjects:
# Create message in database
createdMessage = self.db.recordCreate("workflowMessages", messageData)
- # Update workflow's messageIds if this is a new message
- if createdMessage:
- # Get current messageIds or initialize empty list
- messageIds = workflow.messageIds if hasattr(workflow, 'messageIds') else []
-
- # Add the new message ID if not already in the list
- if createdMessage["id"] not in messageIds:
- messageIds.append(createdMessage["id"])
- self.updateWorkflow(workflowId, {"messageIds": messageIds})
-
# Convert to ChatMessage model
return ChatMessage(
id=createdMessage["id"],
@@ -319,7 +311,7 @@ class ChatObjects:
message=createdMessage.get("message"),
role=createdMessage.get("role", "assistant"),
status=createdMessage.get("status", "step"),
- sequenceNr=len(messageIds), # Set sequence number based on message position
+ sequenceNr=len(workflow.messages) + 1, # Use messages list length for sequence number
publishedAt=createdMessage.get("publishedAt", self._getCurrentTimestamp()),
stats=ChatStat(**createdMessage.get("stats", {})) if createdMessage.get("stats") else None
)
@@ -702,15 +694,6 @@ class ChatObjects:
messageCount = len(messages)
logger.debug(f"Loaded {messageCount} messages for workflow {workflowId}")
- # Check if messageIds exists and is valid
- messageIds = workflow.get("messageIds", [])
- if not messageIds or len(messageIds) != len(messages):
- # Rebuild messageIds from messages
- messageIds = [msg.get("id") for msg in messages]
- # Update in database
- self.updateWorkflow(workflowId, {"messageIds": messageIds})
- logger.debug(f"Rebuilt messageIds for workflow {workflowId}")
-
# Log document counts for each message
for msg in messages:
docCount = len(msg.get("documents", []))
@@ -725,7 +708,6 @@ class ChatObjects:
# Assemble complete workflow object
completeWorkflow = workflow.copy()
completeWorkflow["messages"] = messages
- completeWorkflow["messageIds"] = messageIds
completeWorkflow["logs"] = logs
return completeWorkflow
@@ -862,6 +844,7 @@ class ChatObjects:
return TaskItem(
id=task["id"],
workflowId=task["workflowId"],
+ userInput=task.get("userInput", ""),
status=task.get("status", TaskStatus.PENDING),
error=task.get("error"),
startedAt=task.get("startedAt"),
@@ -892,6 +875,9 @@ class ChatObjects:
def createTask(self, taskData: Dict[str, Any]) -> TaskItem:
"""Creates a new task if user has access to the workflow."""
try:
+ # Ensure ID is present
+ if "id" not in taskData or not taskData["id"]:
+ taskData["id"] = f"task_{uuid.uuid4()}"
# Check workflow access
workflowId = taskData.get("workflowId")
if not workflowId:
@@ -908,9 +894,6 @@ class ChatObjects:
return None
# Ensure required fields
- if "id" not in taskData:
- taskData["id"] = f"task_{uuid.uuid4()}"
-
if "status" not in taskData:
taskData["status"] = TaskStatus.PENDING
@@ -924,6 +907,7 @@ class ChatObjects:
task = TaskItem(
id=createdTask["id"],
workflowId=createdTask["workflowId"],
+ userInput=createdTask.get("userInput", ""),
status=createdTask.get("status", TaskStatus.PENDING),
error=createdTask.get("error"),
startedAt=createdTask.get("startedAt"),
@@ -938,7 +922,7 @@ class ChatObjects:
)
# Update workflow's task list
- workflowTasks = workflow.get("tasks", [])
+ workflowTasks = workflow.tasks if hasattr(workflow, 'tasks') else []
if task.id not in workflowTasks:
workflowTasks.append(task.id)
self.updateWorkflow(workflowId, {"tasks": workflowTasks})
@@ -975,6 +959,7 @@ class ChatObjects:
return TaskItem(
id=updatedTask["id"],
workflowId=updatedTask["workflowId"],
+ userInput=updatedTask.get("userInput", task.userInput),
status=updatedTask.get("status", task.status),
error=updatedTask.get("error", task.error),
startedAt=updatedTask.get("startedAt", task.startedAt),
@@ -1014,7 +999,7 @@ class ChatObjects:
# Delete task
if self.db.recordDelete("tasks", taskId):
# Update workflow's task list
- workflowTasks = workflow.get("tasks", [])
+ workflowTasks = workflow.tasks if hasattr(workflow, 'tasks') else []
if taskId in workflowTasks:
workflowTasks.remove(taskId)
self.updateWorkflow(task.workflowId, {"tasks": workflowTasks})
@@ -1025,6 +1010,128 @@ class ChatObjects:
logger.error(f"Error deleting task: {str(e)}")
return False
+ # Task Result Management
+
+ def createTaskResult(self, resultData: Dict[str, Any]) -> 'TaskResult':
+ """Creates a new task result if user has access to the workflow."""
+ try:
+ # Ensure ID is present
+ if "id" not in resultData or not resultData["id"]:
+ resultData["id"] = f"result_{uuid.uuid4()}"
+
+ # Check workflow access if taskId is provided
+ taskId = resultData.get("taskId")
+ if taskId:
+ task = self.getTask(taskId)
+ if task:
+ workflow = self.getWorkflow(task.workflowId)
+ if not workflow:
+ logger.warning(f"No access to workflow {task.workflowId}")
+ return None
+
+ if not self._canModify("workflows", task.workflowId):
+ logger.warning(f"No permission to modify workflow {task.workflowId}")
+ return None
+
+ # Ensure required fields
+ if "status" not in resultData:
+ resultData["status"] = TaskStatus.PENDING
+
+ if "success" not in resultData:
+ resultData["success"] = False
+
+ # Create result in database
+ createdResult = self.db.recordCreate("taskResults", resultData)
+
+ # Convert to TaskResult model
+ return TaskResult(
+ taskId=createdResult.get("taskId", ""),
+ status=createdResult.get("status", TaskStatus.PENDING),
+ success=createdResult.get("success", False),
+ feedback=createdResult.get("feedback"),
+ error=createdResult.get("error")
+ )
+
+ except Exception as e:
+ logger.error(f"Error creating task result: {str(e)}")
+ return None
+
+ def createActionResult(self, resultData: Dict[str, Any]) -> 'ActionResult':
+ """Creates a new action result."""
+ try:
+ # Ensure ID is present
+ if "id" not in resultData or not resultData["id"]:
+ resultData["id"] = f"action_result_{uuid.uuid4()}"
+
+ # Ensure required fields
+ if "success" not in resultData:
+ resultData["success"] = False
+
+ if "data" not in resultData:
+ resultData["data"] = {}
+
+ # Create result in database
+ createdResult = self.db.recordCreate("actionResults", resultData)
+
+ # Convert to ActionResult model
+ return ActionResult(
+ success=createdResult.get("success", False),
+ data=createdResult.get("data", {}),
+ metadata=createdResult.get("metadata", {}),
+ validation=createdResult.get("validation", []),
+ error=createdResult.get("error")
+ )
+
+ except Exception as e:
+ logger.error(f"Error creating action result: {str(e)}")
+ return None
+
+ def createTaskAction(self, actionData: Dict[str, Any]) -> TaskAction:
+ """Creates a new task action."""
+ try:
+ # Ensure ID is present
+ if "id" not in actionData or not actionData["id"]:
+ actionData["id"] = f"action_{uuid.uuid4()}"
+
+ # Ensure required fields
+ if "status" not in actionData:
+ actionData["status"] = TaskStatus.PENDING
+
+ if "execMethod" not in actionData:
+ logger.error("execMethod is required for task action")
+ return None
+
+ if "execAction" not in actionData:
+ logger.error("execAction is required for task action")
+ return None
+
+ if "execParameters" not in actionData:
+ actionData["execParameters"] = {}
+
+ # Create action in database
+ createdAction = self.db.recordCreate("taskActions", actionData)
+
+ # Convert to TaskAction model
+ return TaskAction(
+ id=createdAction["id"],
+ execMethod=createdAction["execMethod"],
+ execAction=createdAction["execAction"],
+ execParameters=createdAction.get("execParameters", {}),
+ execResultLabel=createdAction.get("execResultLabel"),
+ status=createdAction.get("status", TaskStatus.PENDING),
+ error=createdAction.get("error"),
+ retryCount=createdAction.get("retryCount", 0),
+ retryMax=createdAction.get("retryMax", 3),
+ processingTime=createdAction.get("processingTime"),
+ timestamp=datetime.fromisoformat(createdAction.get("timestamp", datetime.now().isoformat())),
+ result=createdAction.get("result"),
+ resultDocuments=createdAction.get("resultDocuments", [])
+ )
+
+ except Exception as e:
+ logger.error(f"Error creating task action: {str(e)}")
+ return None
+
def getInterface(currentUser: Optional[User] = None) -> 'ChatObjects':
"""
Returns a ChatObjects instance for the current user.
diff --git a/modules/methods/methodCoder.py b/modules/methods/methodCoder.py
index c570e456..2b394f7d 100644
--- a/modules/methods/methodCoder.py
+++ b/modules/methods/methodCoder.py
@@ -2,7 +2,7 @@ from typing import Dict, Any, Optional
import logging
from datetime import datetime, UTC
-from modules.methods.methodBase import MethodBase, ActionResult, action
+from modules.workflow.methodBase import MethodBase, ActionResult, action
logger = logging.getLogger(__name__)
diff --git a/modules/methods/methodDocument.py b/modules/methods/methodDocument.py
index c4a38209..96efb3a1 100644
--- a/modules/methods/methodDocument.py
+++ b/modules/methods/methodDocument.py
@@ -7,7 +7,7 @@ import logging
from typing import Dict, Any, List, Optional
from modules.workflow.managerDocument import DocumentManager
-from modules.methods.methodBase import MethodBase, ActionResult, action
+from modules.workflow.methodBase import MethodBase, ActionResult, action
logger = logging.getLogger(__name__)
diff --git a/modules/methods/methodExcel.py b/modules/methods/methodExcel.py
index 442204cd..a281d8b4 100644
--- a/modules/methods/methodExcel.py
+++ b/modules/methods/methodExcel.py
@@ -9,7 +9,7 @@ from datetime import datetime, UTC
import json
import base64
-from modules.methods.methodBase import MethodBase, ActionResult, action
+from modules.workflow.methodBase import MethodBase, ActionResult, action
logger = logging.getLogger(__name__)
@@ -23,14 +23,21 @@ class ExcelService:
"""Get Microsoft connection from connection reference"""
try:
userConnection = self.serviceContainer.getUserConnectionFromConnectionReference(connectionReference)
- if userConnection and userConnection.authority == "microsoft" and userConnection.enabled:
- return {
- "id": userConnection.id,
- "accessToken": userConnection.accessToken,
- "refreshToken": userConnection.refreshToken,
- "scopes": userConnection.scopes
- }
- return None
+ if not userConnection or userConnection.authority != "msft" or userConnection.status != "active":
+ return None
+
+ # Get the corresponding token for this user and authority
+ token = self.serviceContainer.interfaceApp.getToken(userConnection.authority)
+ if not token:
+ logger.warning(f"No token found for user {userConnection.userId} and authority {userConnection.authority}")
+ return None
+
+ return {
+ "id": userConnection.id,
+ "accessToken": token.tokenAccess,
+ "refreshToken": token.tokenRefresh,
+ "scopes": ["Mail.ReadWrite", "User.Read"] # Default Microsoft scopes
+ }
except Exception as e:
logger.error(f"Error getting Microsoft connection: {str(e)}")
return None
diff --git a/modules/methods/methodOperator.py b/modules/methods/methodOperator.py
index a560c93b..9ee8b9b5 100644
--- a/modules/methods/methodOperator.py
+++ b/modules/methods/methodOperator.py
@@ -4,7 +4,7 @@ from typing import Dict, List, Any, Optional
from datetime import datetime, UTC
import logging
-from modules.methods.methodBase import MethodBase, ActionResult, action
+from modules.workflow.methodBase import MethodBase, ActionResult, action
logger = logging.getLogger(__name__)
diff --git a/modules/methods/methodOutlook.py b/modules/methods/methodOutlook.py
index 1e4ed8f1..ea1b7a8b 100644
--- a/modules/methods/methodOutlook.py
+++ b/modules/methods/methodOutlook.py
@@ -8,7 +8,7 @@ from typing import Dict, Any, List, Optional
from datetime import datetime, UTC
import json
-from modules.methods.methodBase import MethodBase, ActionResult, action
+from modules.workflow.methodBase import MethodBase, ActionResult, action
logger = logging.getLogger(__name__)
@@ -22,14 +22,21 @@ class OutlookService:
"""Get Microsoft connection from connection reference"""
try:
userConnection = self.serviceContainer.getUserConnectionFromConnectionReference(connectionReference)
- if userConnection and userConnection.authority == "microsoft" and userConnection.enabled:
- return {
- "id": userConnection.id,
- "accessToken": userConnection.accessToken,
- "refreshToken": userConnection.refreshToken,
- "scopes": userConnection.scopes
- }
- return None
+ if not userConnection or userConnection.authority != "msft" or userConnection.status != "active":
+ return None
+
+ # Get the corresponding token for this user and authority
+ token = self.serviceContainer.interfaceApp.getToken(userConnection.authority)
+ if not token:
+ logger.warning(f"No token found for user {userConnection.userId} and authority {userConnection.authority}")
+ return None
+
+ return {
+ "id": userConnection.id,
+ "accessToken": token.tokenAccess,
+ "refreshToken": token.tokenRefresh,
+ "scopes": ["Mail.ReadWrite", "User.Read"] # Default Microsoft scopes
+ }
except Exception as e:
logger.error(f"Error getting Microsoft connection: {str(e)}")
return None
diff --git a/modules/methods/methodPowerpoint.py b/modules/methods/methodPowerpoint.py
index 72de822c..e8c177dd 100644
--- a/modules/methods/methodPowerpoint.py
+++ b/modules/methods/methodPowerpoint.py
@@ -9,7 +9,7 @@ from datetime import datetime, UTC
import json
import base64
-from modules.methods.methodBase import MethodBase, ActionResult, action
+from modules.workflow.methodBase import MethodBase, ActionResult, action
logger = logging.getLogger(__name__)
@@ -23,14 +23,21 @@ class PowerpointService:
"""Get Microsoft connection from connection reference"""
try:
userConnection = self.serviceContainer.getUserConnectionFromConnectionReference(connectionReference)
- if userConnection and userConnection.authority == "microsoft" and userConnection.enabled:
- return {
- "id": userConnection.id,
- "accessToken": userConnection.accessToken,
- "refreshToken": userConnection.refreshToken,
- "scopes": userConnection.scopes
- }
- return None
+ if not userConnection or userConnection.authority != "msft" or userConnection.status != "active":
+ return None
+
+ # Get the corresponding token for this user and authority
+ token = self.serviceContainer.interfaceApp.getToken(userConnection.authority)
+ if not token:
+ logger.warning(f"No token found for user {userConnection.userId} and authority {userConnection.authority}")
+ return None
+
+ return {
+ "id": userConnection.id,
+ "accessToken": token.tokenAccess,
+ "refreshToken": token.tokenRefresh,
+ "scopes": ["Mail.ReadWrite", "User.Read"] # Default Microsoft scopes
+ }
except Exception as e:
logger.error(f"Error getting Microsoft connection: {str(e)}")
return None
diff --git a/modules/methods/methodSharepoint.py b/modules/methods/methodSharepoint.py
index 2f2cf5b1..ee9d04f3 100644
--- a/modules/methods/methodSharepoint.py
+++ b/modules/methods/methodSharepoint.py
@@ -8,7 +8,7 @@ from typing import Dict, Any, List, Optional
from datetime import datetime, UTC
import json
-from modules.methods.methodBase import MethodBase, ActionResult, action
+from modules.workflow.methodBase import MethodBase, ActionResult, action
logger = logging.getLogger(__name__)
@@ -22,14 +22,21 @@ class SharepointService:
"""Get Microsoft connection from connection reference"""
try:
userConnection = self.serviceContainer.getUserConnectionFromConnectionReference(connectionReference)
- if userConnection and userConnection.authority == "microsoft" and userConnection.enabled:
- return {
- "id": userConnection.id,
- "accessToken": userConnection.accessToken,
- "refreshToken": userConnection.refreshToken,
- "scopes": userConnection.scopes
- }
- return None
+ if not userConnection or userConnection.authority != "msft" or userConnection.status != "active":
+ return None
+
+ # Get the corresponding token for this user and authority
+ token = self.serviceContainer.interfaceApp.getToken(userConnection.authority)
+ if not token:
+ logger.warning(f"No token found for user {userConnection.userId} and authority {userConnection.authority}")
+ return None
+
+ return {
+ "id": userConnection.id,
+ "accessToken": token.tokenAccess,
+ "refreshToken": token.tokenRefresh,
+ "scopes": ["Mail.ReadWrite", "User.Read"] # Default Microsoft scopes
+ }
except Exception as e:
logger.error(f"Error getting Microsoft connection: {str(e)}")
return None
diff --git a/modules/methods/methodWeb.py b/modules/methods/methodWeb.py
index ced581a1..102ae090 100644
--- a/modules/methods/methodWeb.py
+++ b/modules/methods/methodWeb.py
@@ -10,7 +10,7 @@ import requests
from bs4 import BeautifulSoup
import time
-from modules.methods.methodBase import MethodBase, ActionResult, action
+from modules.workflow.methodBase import MethodBase, ActionResult, action
from modules.shared.configuration import APP_CONFIG
logger = logging.getLogger(__name__)
diff --git a/modules/workflow/managerChat.py b/modules/workflow/managerChat.py
index 614bc3b3..799a1d30 100644
--- a/modules/workflow/managerChat.py
+++ b/modules/workflow/managerChat.py
@@ -2,8 +2,6 @@ import logging
from typing import Dict, Any, Optional, List, Union
from datetime import datetime, UTC
import json
-import uuid
-import time
from modules.interfaces.interfaceAppModel import User
from modules.interfaces.interfaceChatModel import (
@@ -29,10 +27,39 @@ class ChatManager:
self.workflow = workflow
self.service = ServiceContainer(self.currentUser, self.workflow)
+ def _extractJsonFromResponse(self, response: str) -> Optional[Dict[str, Any]]:
+ """Extract JSON from verbose AI response that may contain explanatory text"""
+ try:
+ # First try direct JSON parsing
+ return json.loads(response)
+ except json.JSONDecodeError:
+ # Try to find JSON in the response
+ import re
+
+ # Look for JSON object patterns
+ json_patterns = [
+ r'\{.*\}', # Basic JSON object
+ r'\[\{.*\}\]', # JSON array of objects
+ ]
+
+ for pattern in json_patterns:
+ matches = re.findall(pattern, response, re.DOTALL)
+ for match in matches:
+ try:
+ return json.loads(match)
+ except json.JSONDecodeError:
+ continue
+
+ # If no JSON found, log the full response for debugging
+ logger.error(f"Could not extract JSON from response: {response[:500]}...")
+ return None
+
# ===== Task Creation and Management =====
async def createInitialTask(self, workflow: ChatWorkflow, initialMessage: ChatMessage) -> Optional[TaskItem]:
"""Create the initial task from the first message"""
try:
+ logger.info(f"Creating initial task for workflow {workflow.id}")
+
# Create task definition prompt
prompt = await self._createTaskDefinitionPrompt(initialMessage.message, workflow)
@@ -40,13 +67,13 @@ class ChatManager:
response = await self.service.callAiTextAdvanced(prompt)
# Parse response
- try:
- taskDef = json.loads(response)
- except json.JSONDecodeError:
- logger.error(f"Invalid JSON in task definition: {response}")
- return None
+ taskDef = self._extractJsonFromResponse(response)
# Validate task definition
+ if not taskDef:
+ logger.error("Could not extract valid JSON from AI response")
+ return None
+
if not isinstance(taskDef, dict):
logger.error("Task definition must be a JSON object")
return None
@@ -61,6 +88,8 @@ class ChatManager:
logger.error("Actions must be a list")
return None
+ logger.info(f"Task definition validated: {len(taskDef['actions'])} actions")
+
# Create task using interface
taskData = {
"workflowId": workflow.id,
@@ -79,17 +108,40 @@ class ChatManager:
if not all(field in actionDef for field in requiredFields):
continue
- action = TaskAction(
- id=str(uuid.uuid4()),
- execMethod=actionDef["method"],
- execAction=actionDef["action"],
- execParameters=actionDef["parameters"],
- execResultLabel=actionDef.get("resultLabel")
- )
- taskData["actionList"].append(action)
+ # Create action using interface
+ actionData = {
+ "execMethod": actionDef["method"],
+ "execAction": actionDef["action"],
+ "execParameters": actionDef["parameters"],
+ "execResultLabel": actionDef.get("resultLabel")
+ }
+ action = self.chatInterface.createTaskAction(actionData)
+ if action:
+ # Convert TaskAction object to dictionary for database storage
+ actionDict = {
+ "id": action.id,
+ "execMethod": action.execMethod,
+ "execAction": action.execAction,
+ "execParameters": action.execParameters,
+ "execResultLabel": action.execResultLabel,
+ "status": action.status,
+ "error": action.error,
+ "retryCount": action.retryCount,
+ "retryMax": action.retryMax,
+ "processingTime": action.processingTime,
+ "timestamp": action.timestamp.isoformat() if action.timestamp else None,
+ "result": action.result,
+ "resultDocuments": action.resultDocuments
+ }
+ taskData["actionList"].append(actionDict)
# Create task using interface
task = self.chatInterface.createTask(taskData)
+ if task:
+ logger.info(f"Task created successfully: {task.id}")
+ else:
+ logger.error("Failed to create task")
+
return task
except Exception as e:
@@ -99,6 +151,8 @@ class ChatManager:
async def createNextTask(self, workflow: ChatWorkflow, previousResult: TaskResult) -> Optional[TaskItem]:
"""Create next task based on previous result"""
try:
+ logger.info(f"Creating next task for workflow {workflow.id}")
+
# Check if previous result was successful
if not previousResult.success:
logger.error(f"Previous task failed: {previousResult.error}")
@@ -111,13 +165,13 @@ class ChatManager:
response = await self.service.callAiTextAdvanced(prompt)
# Parse response
- try:
- taskDef = json.loads(response)
- except json.JSONDecodeError:
- logger.error(f"Invalid JSON in task definition: {response}")
- return None
+ taskDef = self._extractJsonFromResponse(response)
# Validate task definition
+ if not taskDef:
+ logger.error("Could not extract valid JSON from AI response")
+ return None
+
if not isinstance(taskDef, dict):
logger.error("Task definition must be a JSON object")
return None
@@ -132,6 +186,8 @@ class ChatManager:
logger.error("Actions must be a list")
return None
+ logger.info(f"Next task definition validated: {len(taskDef['actions'])} actions")
+
# Create task using interface
taskData = {
"workflowId": workflow.id,
@@ -150,17 +206,40 @@ class ChatManager:
if not all(field in actionDef for field in requiredFields):
continue
- action = TaskAction(
- id=str(uuid.uuid4()),
- execMethod=actionDef["method"],
- execAction=actionDef["action"],
- execParameters=actionDef["parameters"],
- execResultLabel=actionDef.get("resultLabel")
- )
- taskData["actionList"].append(action)
+ # Create action using interface
+ actionData = {
+ "execMethod": actionDef["method"],
+ "execAction": actionDef["action"],
+ "execParameters": actionDef["parameters"],
+ "execResultLabel": actionDef.get("resultLabel")
+ }
+ action = self.chatInterface.createTaskAction(actionData)
+ if action:
+ # Convert TaskAction object to dictionary for database storage
+ actionDict = {
+ "id": action.id,
+ "execMethod": action.execMethod,
+ "execAction": action.execAction,
+ "execParameters": action.execParameters,
+ "execResultLabel": action.execResultLabel,
+ "status": action.status,
+ "error": action.error,
+ "retryCount": action.retryCount,
+ "retryMax": action.retryMax,
+ "processingTime": action.processingTime,
+ "timestamp": action.timestamp.isoformat() if action.timestamp else None,
+ "result": action.result,
+ "resultDocuments": action.resultDocuments
+ }
+ taskData["actionList"].append(actionDict)
# Create task using interface
task = self.chatInterface.createTask(taskData)
+ if task:
+ logger.info(f"Next task created successfully: {task.id}")
+ else:
+ logger.error("Failed to create next task")
+
return task
except Exception as e:
@@ -198,9 +277,8 @@ Example format:
response = await self.service.callAiTextBasic(prompt)
# Parse response
- try:
- result = json.loads(response)
- except json.JSONDecodeError:
+ result = self._extractJsonFromResponse(response)
+ if not result:
logger.error(f"Invalid JSON in action result: {response}")
action.status = "failed"
action.error = "Invalid result format"
@@ -229,6 +307,8 @@ Example format:
if message:
self.workflow.messages.append(message)
+ logger.info(f"Action execution logged: {action.execMethod}.{action.execAction} - {action.status}")
+
# If action failed, stop execution
if action.status == "failed":
break
@@ -324,33 +404,35 @@ Example format:
response = await self.service.callAiTextBasic(prompt)
# Parse response
- try:
- result = json.loads(response)
- except json.JSONDecodeError:
+ result = self._extractJsonFromResponse(response)
+ if not result:
logger.error(f"Invalid JSON in next task identification: {response}")
- return TaskResult(
- taskId=str(uuid.uuid4()),
- status="failed",
- success=False,
- error="Invalid result format"
- )
+ # Create error result using interface
+ errorResultData = {
+ "status": "failed",
+ "success": False,
+ "error": "Invalid result format"
+ }
+ return self.chatInterface.createTaskResult(errorResultData)
- return TaskResult(
- taskId=str(uuid.uuid4()),
- status="completed" if result.get("success", False) else "failed",
- success=result.get("success", False),
- feedback=result.get("feedback", ""),
- error=result.get("error", "")
- )
+ # Create result using interface
+ resultData = {
+ "status": "completed" if result.get("success", False) else "failed",
+ "success": result.get("success", False),
+ "feedback": result.get("feedback", ""),
+ "error": result.get("error", "")
+ }
+ return self.chatInterface.createTaskResult(resultData)
except Exception as e:
logger.error(f"Error identifying next task: {str(e)}")
- return TaskResult(
- taskId=str(uuid.uuid4()),
- status="failed",
- success=False,
- error=str(e)
- )
+ # Create error result using interface
+ errorResultData = {
+ "status": "failed",
+ "success": False,
+ "error": str(e)
+ }
+ return self.chatInterface.createTaskResult(errorResultData)
async def generateWorkflowFeedback(self, workflow: ChatWorkflow) -> str:
"""Generate final feedback for the workflow"""
@@ -408,79 +490,93 @@ Please provide a comprehensive summary of this workflow."""
docRefs = self.service.getDocumentReferenceList()
connRefs = self.service.getConnectionReferenceList()
- return f"""
-Task Definition for: {userInput}
+ prompt = f"""You are a task planning AI that creates structured task definitions in JSON format.
-Chat History:
-{messageSummary}
+TASK REQUEST: {userInput}
-Available Methods:
-{chr(10).join(f"- {method}" for method in methodList)}
+CONTEXT:
+Chat History: {messageSummary}
-Available Documents:
-{chr(10).join(f"- {doc['documentReference']} ({doc['datetime']})" for doc in docRefs.get('chat', []))}
+AVAILABLE RESOURCES:
+Methods: {chr(10).join(f"- {method}" for method in methodList)}
-Available Connections:
-{chr(10).join(f"- {conn['connectionReference']} ({conn['authority']})" for conn in connRefs)}
+Documents: {chr(10).join(f"- {doc['documentReference']} ({doc['datetime']})" for doc in docRefs.get('chat', []))}
-Your Task:
-1. Analyze the user input and chat history
-2. Determine what actions are needed to accomplish the task
-3. Create a sequence of actions using only the available methods, documents, and connections
-4. Provide feedback about what will be done and what needs to be done next
+Connections: {chr(10).join(f"- {conn['connectionReference']} ({conn['authority']})" for conn in connRefs)}
-Required Output:
-1. A JSON object containing:
- - status: Current state of the task ("pending", "running", "completed", or "failed")
- - feedback: Explanation of what will be done and what needs to be done next
- - actions: List of actions to execute, each containing:
- * method: The method to use
- * action: The specific action to perform
- * parameters: Required parameters for the action
- * resultLabel: Label for the action's result
+INSTRUCTIONS:
+1. Analyze the task request and available resources
+2. Create a sequence of actions to accomplish the task
+3. Use ONLY the provided methods, documents, and connections
+4. Return a VALID JSON object with the exact structure shown below
-2. Available Data:
- - Use only provided document references in Available Documents section
- - Use only provided connection references in Available Connections section
-
-3. Method Usage Rules:
- - Syntax: method.action([parameter:type])->resultLabel:type
- - resultLabel format: documentList__