gateway/modules/workflow/managerChat.py
2025-06-12 17:52:51 +02:00

781 lines
No EOL
30 KiB
Python

import logging
from typing import Dict, Any, Optional, List, Union
from datetime import datetime, UTC
import json
import uuid
import time
from modules.interfaces.serviceAppModel import User
from modules.interfaces.serviceChatModel import (
TaskStatus, ChatDocument, TaskItem, TaskAction, TaskResult, ChatStat, ChatLog, ChatMessage, ChatWorkflow
)
from modules.workflow.serviceContainer import ServiceContainer
logger = logging.getLogger(__name__)
class ChatManager:
"""Chat manager with improved AI integration and method handling"""
def __init__(self, currentUser: User):
self.currentUser = currentUser
self.service: ServiceContainer = None
# ===== Initialization and Setup =====
async def initialize(self, workflow: ChatWorkflow) -> None:
"""Initialize chat manager with workflow"""
self.workflow = workflow
self.service = ServiceContainer(self.currentUser, self.workflow)
# ===== Task Creation and Management =====
async def createInitialTask(self, workflow: ChatWorkflow, initialMessage: ChatMessage) -> Optional[TaskItem]:
"""Create the initial task from the first message"""
try:
# Get available methods and their actions
methodCatalog = self.service.getMethodsCatalog()
# Process user input with AI
processedInput = await self._processUserInput(initialMessage.message, methodCatalog)
# Create actions from processed input
actions = await self._createActions(processedInput['actions'])
# Create task data
taskData = {
"workflowId": workflow.id,
"userInput": processedInput['objective'],
"dataList": initialMessage.documents,
"actionList": [action.dict() for action in actions],
"status": TaskStatus.PENDING,
"startedAt": datetime.now(UTC).isoformat(),
"updatedAt": datetime.now(UTC).isoformat()
}
# Create task using ChatInterface
task = self.service.createTask(taskData)
if task:
self.service.currentTask = task
return task
except Exception as e:
logger.error(f"Error creating initial task: {str(e)}")
return None
async def createNextTask(self, workflow: ChatWorkflow, previousResult: TaskResult) -> Optional[TaskItem]:
"""Create next task based on previous result"""
try:
# Check if previous result was successful
if not previousResult.success:
logger.error(f"Previous task failed: {previousResult.error}")
return None
# Extract task data from previous result
taskData = previousResult.data
if not taskData:
logger.error("No task data in previous result")
return None
# Create task data
taskData = {
"workflowId": workflow.id,
"userInput": taskData.get('objective', ''),
"actionList": [action.dict() for action in await self._createActions(taskData.get('actions', []))],
"status": TaskStatus.PENDING,
"startedAt": datetime.now(UTC).isoformat(),
"updatedAt": datetime.now(UTC).isoformat()
}
# Create task using ChatInterface
task = self.service.createTask(taskData)
if task:
self.service.currentTask = task
return task
except Exception as e:
logger.error(f"Error creating next task: {str(e)}")
return None
async def identifyNextTask(self, workflow: ChatWorkflow) -> TaskResult:
"""Identify the next task based on workflow state"""
try:
# Get workflow summary
summary = await self._summarizeWorkflow()
# Generate prompt for next task
prompt = f"""Based on the workflow summary:
{summary}
Determine what the next task should be.
Return a JSON object with:
- objective: The main goal or task to accomplish
- actions: List of required actions with method and parameters
"""
# Get AI response
response = await self.service.callAiBasic(prompt)
# Parse response
try:
result = json.loads(response)
return TaskResult(
taskId=f"analysis_{datetime.now(UTC).timestamp()}",
status=TaskStatus.COMPLETED,
success=True,
timestamp=datetime.now(UTC),
data=result,
documentsLabel="Task Results"
)
except json.JSONDecodeError as e:
logger.error(f"Error parsing AI response: {str(e)}")
return TaskResult(
taskId=f"analysis_{datetime.now(UTC).timestamp()}",
status=TaskStatus.FAILED,
success=False,
timestamp=datetime.now(UTC),
error=f"Error parsing AI response: {str(e)}",
documentsLabel="Task Error"
)
except Exception as e:
logger.error(f"Error identifying next task: {str(e)}")
return TaskResult(
taskId=f"analysis_{datetime.now(UTC).timestamp()}",
status=TaskStatus.FAILED,
success=False,
timestamp=datetime.now(UTC),
error=f"Error identifying next task: {str(e)}",
documentsLabel="Task Error"
)
async def generateWorkflowFeedback(self, workflow: ChatWorkflow) -> str:
"""
Generates a final feedback message for the workflow in the user's language.
Args:
workflow: The completed workflow to generate feedback for
Returns:
str: The generated feedback message
"""
try:
# Get workflow summary
workflowSummary = {
"status": workflow.status,
"totalMessages": len(workflow.messages),
"totalDocuments": sum(len(msg.documents) for msg in workflow.messages),
"duration": (datetime.now(UTC) - datetime.fromisoformat(workflow.startedAt)).total_seconds()
}
# Get user language from service
userLanguage = self.service.user.language
# Prepare messages for AI context
messages = [
{
"role": "system",
"content": f"You are an AI assistant providing a summary of a completed workflow. "
f"Please respond in '{userLanguage}' language. "
f"Summarize the workflow's activities, outcomes, and any important points. "
f"Be concise but informative. Use a professional but friendly tone."
},
{
"role": "user",
"content": f"Please provide a summary of this workflow:\n"
f"Status: {workflowSummary['status']}\n"
f"Total Messages: {workflowSummary['totalMessages']}\n"
f"Total Documents: {workflowSummary['totalDocuments']}\n"
f"Duration: {workflowSummary['duration']:.1f} seconds"
}
]
# Add relevant workflow messages for context
for msg in workflow.messages:
if msg.role == "user" or msg.status in ["first", "last"]:
messages.append({
"role": msg.role,
"content": msg.message
})
# Generate feedback using AI
feedback = await self.service.callAi(messages, produceUserAnswer=True, temperature=0.7)
return feedback
except Exception as e:
logger.error(f"Error generating workflow feedback: {str(e)}")
return "Workflow completed successfully."
def _generatePrompt(self, task: str, document: ChatDocument, examples: List[Dict[str, str]] = None) -> str:
"""Generate a prompt based on task and document"""
try:
# Create base prompt
prompt = f"""Task: {task}
Document: {document.filename} ({document.mimeType})
"""
# Add examples if provided
if examples:
prompt += "\nExamples:\n"
for example in examples:
prompt += f"Input: {example.get('input', '')}\n"
prompt += f"Output: {example.get('output', '')}\n\n"
return prompt
except Exception as e:
logger.error(f"Error generating prompt: {str(e)}")
return ""
# ===== Task Execution and Processing =====
async def executeTask(self, task: TaskItem) -> TaskItem:
"""Execute a task with its list of actions"""
try:
# Start timing
start_time = time.time()
task.startedAt = datetime.now(UTC).isoformat()
task.status = TaskStatus.RUNNING
# Execute each action in sequence
for action in task.actionList:
try:
# Execute action
action_start = time.time()
result = await self._executeAction(action)
action.processingTime = time.time() - action_start
# Validate result
if not result.success:
error_msg = result.error or "Unknown error"
action.setError(error_msg)
# Create error message
error_message = ChatMessage(
workflowId=task.workflowId,
message=f"{action.execMethod}.{action.execAction}.error: {error_msg}",
role="system",
status="step",
sequenceNr=0, # Will be set by workflow
publishedAt=datetime.now(UTC).isoformat(),
success=False,
actionId=action.id,
actionMethod=action.execMethod,
actionName=action.execAction
)
# Add error message to workflow
await self._addMessageToWorkflow(task.workflowId, error_message)
# If action failed and we have retries left, retry
if action.retryCount < action.retryMax:
action.retryCount += 1
continue
# If we're out of retries, fail the task
task.error = f"Action {action.id} failed after {action.retryCount} retries: {error_msg}"
task.status = TaskStatus.FAILED
return task
# Process successful result
action.setSuccess()
# Set result label from AI response if provided
if result.data.get("resultLabel"):
action.execResultLabel = result.data["resultLabel"]
# Create result message with documents if any
if result.data.get("documents"):
# Store AI-generated documents in database and create ChatDocuments
documents = []
for doc in result.data["documents"]:
# Create document (which also creates the file)
document = await self.service.createDocument(
fileName=doc["filename"],
mimeType=doc["mimeType"],
content=doc["content"],
base64encoded=doc["base64Encoded"]
)
documents.append(document)
# Create success message with documents
success_message = ChatMessage(
workflowId=task.workflowId,
message=f"{action.execMethod}.{action.execAction}",
role="system",
status="step",
sequenceNr=0, # Will be set by workflow
publishedAt=datetime.now(UTC).isoformat(),
documents=documents,
documentsLabel=action.execResultLabel, # Use the label from action
success=True,
actionId=action.id,
actionMethod=action.execMethod,
actionName=action.execAction
)
# Add success message to workflow
await self._addMessageToWorkflow(task.workflowId, success_message)
# Store result labels
if result.data.get("labels"):
task.resultLabels.update(result.data["labels"])
except Exception as e:
error_msg = str(e)
action.setError(error_msg)
# Create error message
error_message = ChatMessage(
workflowId=task.workflowId,
message=f"{action.execMethod}.{action.execAction}.error: {error_msg}",
role="system",
status="step",
sequenceNr=0, # Will be set by workflow
publishedAt=datetime.now(UTC).isoformat(),
success=False,
actionId=action.id,
actionMethod=action.execMethod,
actionName=action.execAction
)
# Add error message to workflow
await self._addMessageToWorkflow(task.workflowId, error_message)
# If action failed and we have retries left, retry
if action.retryCount < action.retryMax:
action.retryCount += 1
continue
# If we're out of retries, fail the task
task.error = f"Action {action.id} failed after {action.retryCount} retries: {error_msg}"
task.status = TaskStatus.FAILED
return task
# Check if all actions were successful
if all(action.isSuccessful() for action in task.actionList):
task.status = TaskStatus.COMPLETED
task.feedback = "Task completed successfully"
# Create chat message with results
message = ChatMessage(
workflowId=task.workflowId,
message=task.feedback,
role="system",
status="last",
sequenceNr=0, # Will be set by workflow
publishedAt=datetime.now(UTC).isoformat(),
success=True
)
# Add message to workflow
await self._addMessageToWorkflow(task.workflowId, message)
else:
# If any action failed, fail the task
task.status = TaskStatus.FAILED
task.error = "One or more actions failed"
# Create error message
error_message = ChatMessage(
workflowId=task.workflowId,
message=f"Task failed: {task.getErrorMessage()}",
role="system",
status="last",
sequenceNr=0, # Will be set by workflow
publishedAt=datetime.now(UTC).isoformat(),
success=False
)
# Add error message to workflow
await self._addMessageToWorkflow(task.workflowId, error_message)
# Calculate processing time
task.processingTime = time.time() - start_time
task.finishedAt = datetime.now(UTC).isoformat()
return task
except Exception as e:
# Handle unexpected errors
task.status = TaskStatus.FAILED
task.error = str(e)
task.finishedAt = datetime.now(UTC).isoformat()
# Create error message
error_message = ChatMessage(
workflowId=task.workflowId,
message=f"Task failed with unexpected error: {task.getErrorMessage()}",
role="system",
status="last",
sequenceNr=0, # Will be set by workflow
publishedAt=datetime.now(UTC).isoformat(),
success=False
)
# Add error message to workflow
await self._addMessageToWorkflow(task.workflowId, error_message)
return task
async def parseTaskResult(self, workflow: ChatWorkflow, task: TaskItem) -> None:
"""Process and store task result in workflow"""
try:
# Create feedback message if available
if task.feedback:
message = ChatMessage(
id=str(uuid.uuid4()),
workflowId=workflow.id,
role="assistant",
message=task.feedback,
status="step",
documents=task.getResultDocuments()
)
self.service.createWorkflowMessage(message.dict())
# Update workflow stats
if task.processingTime:
if not workflow.stats:
workflow.stats = ChatStat()
workflow.stats.processingTime = (workflow.stats.processingTime or 0) + task.processingTime
self.service.updateWorkflow(workflow.id, {"stats": workflow.stats.dict()})
except Exception as e:
logger.error(f"Error parsing task result: {str(e)}")
raise
async def shouldContinue(self, workflow: ChatWorkflow) -> bool:
"""Determine if workflow should continue"""
try:
# Check if workflow is in a terminal state
if workflow.status in ["completed", "failed", "stopped"]:
return False
# Get all tasks for the workflow
tasks = self.service.tasks
# Check if there are any pending tasks
hasPendingTasks = any(t.status == TaskStatus.PENDING for t in tasks)
if not hasPendingTasks:
return False
# Check if any task is currently running
hasRunningTasks = any(t.status == TaskStatus.RUNNING for t in tasks)
if hasRunningTasks:
return True
return False
except Exception as e:
logger.error(f"Error checking workflow continuation: {str(e)}")
return False
async def _summarizeWorkflow(self) -> str:
"""Summarize workflow history"""
if not self.workflow.messages:
return ""
prompt = f"""Summarize the following chat history:
{json.dumps([m.dict() for m in self.workflow.messages], indent=2)}
Please provide a concise summary focusing on:
1. Main objectives
2. Key actions taken
3. Current status
4. Any issues or blockers
"""
return await self.service.callAiBasic(prompt)
async def _analyzeTaskResults(self, task: TaskItem) -> Dict[str, Any]:
"""Analyze task results to determine next steps"""
# Get workflow summary
summary = await self._summarizeWorkflow()
# Generate prompt for analysis
prompt = f"""Based on the workflow summary and task results:
{summary}
Task: {task.userInput}
Status: {task.status}
Error: {task.error if task.error else 'None'}
Determine if the workflow is complete and what the next steps should be.
Return a JSON object with:
- isComplete: boolean
- objective: string
- nextActions: array of action objects
"""
# Get AI response
response = await self.service.callAiBasic(prompt)
# Parse response
return json.loads(response)
def _promptInstructions(self, methodCatalog: Dict[str, Any], isInitialTask: bool = False) -> str:
"""Generate common prompt instructions for task analysis"""
instructions = f"""Available Methods and Actions:
{json.dumps(methodCatalog, indent=2)}
"""
if isInitialTask:
instructions += """Please provide a JSON response with:
1. objective: The main goal or task to accomplish
2. actions: List of required actions with method and parameters
Example format:
{
"objective": "Search for documents about project X",
"actions": [
{
"method": "sharepoint",
"action": "search",
"parameters": {
"query": "project X",
"site": "projects"
}
}
]
}"""
else:
instructions += """Please provide a JSON response with:
1. isComplete: Whether the workflow is complete
2. nextActions: List of next actions needed (if any)
3. issues: Any issues or blockers identified
Example format:
{
"isComplete": false,
"nextActions": [
{
"method": "sharepoint",
"action": "read",
"parameters": {
"documentId": "doc123"
}
}
],
"issues": ["Need authentication for SharePoint"]
}
Note: Only use methods and actions that are available in the method catalog above."""
return instructions
async def _processUserInput(self, userInput: Dict[str, Any], methodCatalog: Dict[str, Any]) -> Dict[str, Any]:
"""Process user input with AI to extract objectives and actions"""
# Create prompt with available methods and actions
prompt = f"""Given the following user input and available methods/actions, extract the objective and required actions:
User Input: {userInput.get('message', '')}
{self._promptInstructions(methodCatalog, isInitialTask=True)}"""
# Call AI service
response = await self.service.callAiBasic(prompt)
return json.loads(response)
async def _createActions(self, actionsData: List[Dict[str, Any]]) -> List[TaskAction]:
"""Create action objects from processed input"""
actions = []
for actionData in actionsData:
try:
# Validate required fields
if not all(k in actionData for k in ['method', 'action']):
logger.warning(f"Skipping invalid action data: {actionData}")
continue
action = TaskAction(
id=f"action_{datetime.now(UTC).timestamp()}",
method=actionData['method'],
action=actionData['action'],
parameters=actionData.get('parameters', {}),
status=TaskStatus.PENDING,
retryCount=0,
retryMax=actionData.get('retryMax', 3)
)
actions.append(action)
except Exception as e:
logger.error(f"Error creating action: {str(e)}")
continue
return actions
async def _processTaskResults(self, task: TaskItem) -> str:
"""Process task results and generate feedback"""
try:
# Generate document prompt
docPrompt = self._generateDocumentPrompt(task.userInput)
# Get AI response for document generation
docResponse = await self.service.callAiBasic(docPrompt)
# Parse response into Task-Document objects
try:
taskDocs = json.loads(docResponse)
task.documentsOutput = taskDocs
except json.JSONDecodeError as e:
logger.error(f"Error parsing document response: {str(e)}")
return f"Error processing results: {str(e)}"
# Generate feedback
feedback = await self.service.callAiBasic(
f"""Generate feedback for the completed task:
Task: {task.userInput}
Generated Documents: {len(task.documentsOutput)} files
Provide a concise summary of what was accomplished.
"""
)
return feedback
except Exception as e:
logger.error(f"Error processing task results: {str(e)}")
return f"Error processing results: {str(e)}"
async def _createOutputDocuments(self, task: TaskItem) -> List[ChatDocument]:
"""Create output documents from task results"""
try:
fileIds = []
# Process each Task-Document from AI output
for taskDoc in task.documentsOutput:
# Store file in database
fileItem = self.service.createFile(
fileName=taskDoc.filename,
mimeType=taskDoc.mimeType,
content=taskDoc.data,
base64Encoded=taskDoc.base64Encoded
)
fileIds.append(fileItem.id)
# Convert all files to ChatDocuments in one call
if fileIds:
return await self.processFileIds(fileIds)
return []
except Exception as e:
logger.error(f"Error creating output documents: {str(e)}")
return []
async def processFileIds(self, fileIds: List[str]) -> List[ChatDocument]:
"""Process multiple files and extract their contents."""
documents = []
for fileId in fileIds:
# Get file metadata
fileInfo = self.service.getFileInfo(fileId)
if not fileInfo:
logger.warning(f"File metadata not found for {fileId}")
continue
# Create ChatDocument
document = ChatDocument(
id=str(uuid.uuid4()),
fileId=fileId,
filename=fileInfo.get("name", "Unknown"),
fileSize=fileInfo.get("size", 0),
mimeType=fileInfo.get("mimeType", "text/plain")
)
documents.append(document)
return documents
def _generateDocumentPrompt(self, task: str) -> str:
"""Generate a prompt for document generation"""
return f"""Generate output documents for the following task:
Task: {task}
For each document you need to generate, provide a Document object with the following structure:
{{
"filename": "string", # Filename with extension
"mimeType": "string", # MIME type of the file
"data": "string", # File content as text or base64
"base64Encoded": boolean # True if data is base64 encoded
}}
Rules:
1. For text files (txt, json, xml, etc.), provide content directly in the data field
2. For binary files (images, videos, etc.), encode content in base64 and set base64Encoded to true
3. Use appropriate MIME types (e.g., text/plain, image/jpeg, application/pdf)
4. Include file extensions in filenames
Return a JSON array of Document objects.
"""
def _createTaskDefinitionPrompt(self, userInput: str, workflow: ChatWorkflow) -> str:
"""Create prompt for task definition"""
# Get available methods
methodList = self.service.getMethodsList()
# Get workflow history
messageSummary = self.service.getMessageSummary(workflow.messages[-1] if workflow.messages else None)
# Get available documents and connections
docRefs = self.service.getDocumentReferenceList()
connRefs = self.service.getConnectionReferenceList()
prompt = f"""
Task Definition for: {userInput}
Available Methods:
{chr(10).join(f"- {method}" for method in methodList)}
Workflow History:
{chr(10).join(f"- {msg['message']}" for msg in messageSummary.get('chat', []))}
Available Documents:
{chr(10).join(f"- {doc['documentReference']} ({doc['datetime']})" for doc in docRefs.get('chat', []))}
Available Connections:
{chr(10).join(f"- {conn}" for conn in connRefs)}
Instructions:
1. Result Format (JSON):
{{
"status": "pending|running|completed|failed",
"feedback": "string explaining what was done and what needs to be done next",
"actions": [
{{
"method": "string",
"action": "string",
"parameters": {{
"param1": "value1",
"param2": "value2"
}},
"resultLabel": "documentList_<uuid>_<label>"
}}
]
}}
2. Available Data:
- Use only provided document references (format: document_<id>_<filename> or documentList_<action.id>_<label>)
- Use only provided connection references
- Use result labels from previous actions in the same task
3. Method Usage Rules:
- Syntax: method.action([parameter:type])->resultLabel:type
- resultLabel format: documentList_<uuid>_<label>
- Actions must be in processing sequence
- Parameters must be from:
* Available document references
* Available connection references
* Result labels from previous actions
4. Result Labels:
- Use consistent naming for related documents
- Include descriptive labels for document sets
- Labels will be used to track document sets in messages
5. Error Handling:
- Include validation for each action
- Specify retry behavior if needed
- Provide clear error messages
- Errors will be recorded in messages with .error: suffix
Please provide the task definition in JSON format following these rules.
"""
return prompt