gateway/modules/workflow/managerChat.py
2025-06-10 18:19:33 +02:00

499 lines
No EOL
19 KiB
Python

import logging
import importlib
import pkgutil
import inspect
from typing import Dict, Any, Optional, List, Type, Callable, Awaitable
from datetime import datetime, UTC
import json
import asyncio
import base64
from modules.methods.methodBase import MethodBase, AuthSource, MethodResult
from modules.workflow.serviceContainer import ServiceContainer
from modules.interfaces.serviceChatModel import (
AgentTask, AgentAction, AgentResult, Action, TaskStatus, ChatWorkflow,
ChatMessage, ChatDocument, ChatStat, ExtractedContent, ContentItem
)
from modules.workflow.processorDocument import DocumentProcessor
from modules.shared.configuration import APP_CONFIG
logger = logging.getLogger(__name__)
class ChatManager:
"""Chat manager with improved AI integration and method handling"""
def __init__(self):
self.service = ServiceContainer()
self._discoverMethods()
self.workflow: Optional[ChatWorkflow] = None
self.currentTask: Optional[AgentTask] = None
self.workflowHistory: List[ChatMessage] = []
self.documentProcessor = DocumentProcessor()
def _discoverMethods(self):
"""Dynamically discover all method classes in modules.methods package"""
try:
# Import the methods package
methodsPackage = importlib.import_module('modules.methods')
# Discover all modules in the package
for _, name, isPkg in pkgutil.iter_modules(methodsPackage.__path__):
if not isPkg and name.startswith('method'):
try:
# Import the module
module = importlib.import_module(f'modules.methods.{name}')
# Find all classes in the module that inherit from MethodBase
for itemName, item in inspect.getmembers(module):
if (inspect.isclass(item) and
issubclass(item, MethodBase) and
item != MethodBase):
# Instantiate the method and add to service
methodInstance = item()
self.service.methods[methodInstance.name] = methodInstance
logger.info(f"Discovered method: {methodInstance.name}")
except Exception as e:
logger.error(f"Error loading method module {name}: {str(e)}")
except Exception as e:
logger.error(f"Error discovering methods: {str(e)}")
async def initialize(self, workflow: ChatWorkflow) -> None:
"""Initialize chat manager with workflow"""
self.service.workflow = workflow
# Initialize AI model
self.service.model = {
'callAiBasic': self._callAiBasic,
'callAiAdvanced': self._callAiAdvanced
}
# Initialize document processor
self.service.documentProcessor.initialize()
def _generatePrompt(self, task: str, document: ChatDocument, examples: List[Dict[str, str]] = None) -> str:
"""Generate a prompt based on task and document"""
try:
# Create base prompt
prompt = f"""Task: {task}
Document: {document.filename} ({document.mimeType})
"""
# Add examples if provided
if examples:
prompt += "\nExamples:\n"
for example in examples:
prompt += f"Input: {example.get('input', '')}\n"
prompt += f"Output: {example.get('output', '')}\n\n"
return prompt
except Exception as e:
logger.error(f"Error generating prompt: {str(e)}")
return ""
async def createInitialTask(self, userInput: Dict[str, Any]) -> AgentTask:
"""Create initial task from user input"""
# Get available methods and their actions
methodCatalog = self.service.getAvailableMethods()
# Process user input with AI
processedInput = await self._processUserInput(userInput, methodCatalog)
# Create actions from processed input
actions = await self._createActions(processedInput['actions'])
# Create task
task = AgentTask(
id=f"task_{datetime.now(UTC).timestamp()}",
workflowId=self.workflow.id,
userInput=processedInput['objective'],
dataList=userInput.get('connections', []),
actionList=actions,
status=TaskStatus.PENDING,
createdAt=datetime.now(UTC),
updatedAt=datetime.now(UTC)
)
# Store in service
self.service.tasks['current'] = task
return task
async def executeCurrentTask(self) -> None:
"""Execute current task"""
task = self.service.tasks.get('current')
if not task:
raise ValueError("No current task to execute")
await self.service.executeTask(task)
async def defineNextTask(self) -> Optional[AgentTask]:
"""Define next task based on current task results"""
current_task = self.service.tasks.get('current')
if not current_task:
return None
try:
# Analyze task results
analysis = await self._analyzeTaskResults(current_task)
# If workflow is complete, update task status
if analysis['isComplete']:
current_task.status = TaskStatus.COMPLETED
current_task.updatedAt = datetime.now(UTC)
return None
# If more actions needed, create next task
if not analysis['isComplete']:
next_task = self._createNextTask(current_task, analysis)
self.service.tasks['previous'] = current_task
self.service.tasks['current'] = next_task
return next_task
except Exception as e:
logger.error(f"Error defining next task: {e}")
current_task.status = TaskStatus.FAILED
current_task.updatedAt = datetime.now(UTC)
return None
async def _processUserInput(self, userInput: Dict[str, Any], methodCatalog: Dict[str, Any]) -> Dict[str, Any]:
"""Process user input with AI to extract objectives and actions"""
# Create prompt with available methods and actions
prompt = f"""Given the following user input and available methods/actions, extract the objective and required actions:
User Input: {userInput.get('message', '')}
Available Methods and Actions:
{json.dumps(methodCatalog, indent=2)}
Please provide a JSON response with:
1. objective: The main goal or task to accomplish
2. actions: List of required actions with method and parameters
Example format:
{{
"objective": "Search for documents about project X",
"actions": [
{{
"method": "sharepoint",
"action": "search",
"parameters": {{
"query": "project X",
"site": "projects"
}}
}}
]
}}
"""
# Call AI service
response = await self._callAiBasic(prompt)
return json.loads(response)
async def _createActions(self, actionsData: List[Dict[str, Any]]) -> List[AgentAction]:
"""Create action objects from processed input"""
actions = []
for actionData in actionsData:
method = self.service.getMethod(actionData['method'])
if not method:
continue
action = AgentAction(
id=f"action_{datetime.now(UTC).timestamp()}",
method=actionData['method'],
action=actionData['action'],
parameters=actionData.get('parameters', {}),
status=TaskStatus.PENDING,
createdAt=datetime.now(UTC),
updatedAt=datetime.now(UTC)
)
actions.append(action)
return actions
async def _summarizeWorkflow(self) -> str:
"""Summarize workflow history"""
if not self.workflow.messages:
return ""
prompt = f"""Summarize the following chat history:
{json.dumps([m.dict() for m in self.workflow.messages], indent=2)}
Please provide a concise summary focusing on:
1. Main objectives
2. Key actions taken
3. Current status
4. Any issues or blockers
"""
return await self._callAiBasic(prompt)
async def _analyzeTaskResults(self, task: AgentTask) -> Dict[str, Any]:
"""Analyze task results to determine next steps"""
# Get workflow summary
summary = await self._summarizeWorkflow()
# Create prompt for analysis
prompt = f"""Analyze the following task results and workflow history to determine next steps:
Task Results:
{json.dumps([a.dict() for a in task.actionList], indent=2)}
Workflow Summary:
{summary}
Please provide a JSON response with:
1. isComplete: Whether the workflow is complete
2. nextActions: List of next actions needed (if any)
3. issues: Any issues or blockers identified
Example format:
{{
"isComplete": false,
"nextActions": [
{{
"method": "sharepoint",
"action": "read",
"parameters": {{
"documentId": "doc123"
}}
}}
],
"issues": ["Need authentication for SharePoint"]
}}
"""
response = await self._callAiBasic(prompt)
return json.loads(response)
def _createNextTask(self, current_task: AgentTask, analysis: Dict[str, Any]) -> AgentTask:
"""Create next task based on analysis"""
# Create actions for next task
actions = []
for action_data in analysis.get('nextActions', []):
action = AgentAction(
id=f"action_{datetime.now(UTC).timestamp()}",
method=action_data['method'],
action=action_data['action'],
parameters=action_data.get('parameters', {}),
status=TaskStatus.PENDING,
createdAt=datetime.now(UTC),
updatedAt=datetime.now(UTC)
)
actions.append(action)
# Create and return next task
return AgentTask(
id=f"task_{datetime.now(UTC).timestamp()}",
workflowId=self.workflow.id,
userInput=current_task.userInput,
dataList=current_task.dataList,
actionList=actions,
status=TaskStatus.PENDING,
createdAt=datetime.now(UTC),
updatedAt=datetime.now(UTC)
)
async def processTask(self, task: AgentTask) -> Dict[str, Any]:
"""Process a task with improved error handling and AI integration"""
try:
# Execute task
await self.service.executeTask(task)
# Process results
if task.status == TaskStatus.COMPLETED:
# Generate feedback using AI
feedback = await self._processTaskResults(task)
task.thisTaskFeedback = feedback
# Create output documents
documents = await self._createOutputDocuments(task)
task.documentsOutput = documents
return {
"status": "success",
"feedback": feedback,
"documents": documents
}
else:
return {
"status": task.status,
"error": task.error,
"feedback": f"Task failed: {task.error}"
}
except Exception as e:
logger.error(f"Error processing task: {str(e)}")
return {
"status": "error",
"error": str(e),
"feedback": f"Error processing task: {str(e)}"
}
def _generateDocumentPrompt(self, task: str) -> str:
"""Generate a prompt for document generation"""
return f"""Generate output documents for the following task:
Task: {task}
For each document you need to generate, provide a TaskDocument object with the following structure:
{{
"filename": "string", # Filename with extension
"mimeType": "string", # MIME type of the file
"data": "string", # File content as text or base64
"base64Encoded": boolean # True if data is base64 encoded
}}
Rules:
1. For text files (txt, json, xml, etc.), provide content directly in the data field
2. For binary files (images, videos, etc.), encode content in base64 and set base64Encoded to true
3. Use appropriate MIME types (e.g., text/plain, image/jpeg, application/pdf)
4. Include file extensions in filenames
Return a JSON array of TaskDocument objects.
"""
async def _processTaskResults(self, task: AgentTask) -> str:
"""Process task results and generate feedback"""
try:
# Generate document prompt
docPrompt = self._generateDocumentPrompt(task.userInput)
# Get AI response for document generation
docResponse = await self._callAiBasic(docPrompt)
# Parse response into TaskDocument objects
try:
taskDocs = json.loads(docResponse)
task.documentsOutput = taskDocs
except json.JSONDecodeError as e:
logger.error(f"Error parsing document response: {str(e)}")
return f"Error processing results: {str(e)}"
# Generate feedback
feedback = await self._callAiBasic(
f"""Generate feedback for the completed task:
Task: {task.userInput}
Generated Documents: {len(task.documentsOutput)} files
Provide a concise summary of what was accomplished.
"""
)
return feedback
except Exception as e:
logger.error(f"Error processing task results: {str(e)}")
return f"Error processing results: {str(e)}"
async def _createOutputDocuments(self, task: AgentTask) -> List[ChatDocument]:
"""Create output documents from task results"""
try:
fileIds = []
# Process each TaskDocument from AI output
for taskDoc in task.documentsOutput:
# Store file in database
fileItem = self.service.functions.createFile(
name=taskDoc.filename,
mimeType=taskDoc.mimeType
)
# Store file content
if taskDoc.base64Encoded:
# Decode base64 content
content = base64.b64decode(taskDoc.data)
else:
# Use text content directly
content = taskDoc.data.encode('utf-8')
# Store file data
self.service.functions.createFileData(fileItem.id, content)
fileIds.append(fileItem.id)
# Convert all files to ChatDocuments in one call
if fileIds:
return await self.service.chat.processFileIds(fileIds)
return []
except Exception as e:
logger.error(f"Error creating output documents: {str(e)}")
return []
async def _callAiBasic(self, prompt: str) -> str:
"""Call basic AI service"""
try:
if not self.service or not self.service.base:
raise ValueError("Service or base interface not initialized")
return await self.service.base.callAi([
{"role": "system", "content": "You are an AI assistant that helps process user requests."},
{"role": "user", "content": prompt}
])
except Exception as e:
logger.error(f"Error calling AI service: {str(e)}")
raise
async def _callAiAdvanced(self, prompt: str, context: Dict[str, Any]) -> str:
"""Call advanced AI model with context"""
# TODO: Implement actual AI call
return "AI response placeholder"
async def generateWorkflowFeedback(self, workflow: ChatWorkflow) -> str:
"""
Generates a final feedback message for the workflow in the user's language.
Args:
workflow: The completed workflow to generate feedback for
Returns:
str: The generated feedback message
"""
try:
# Get workflow summary
workflowSummary = {
"status": workflow.status,
"totalMessages": len(workflow.messages),
"totalDocuments": sum(len(msg.documents) for msg in workflow.messages),
"duration": (datetime.now(UTC) - datetime.fromisoformat(workflow.startedAt)).total_seconds()
}
# Get user language from workflow mandate
userLanguage = workflow.mandateId.split('_')[0] if workflow.mandateId else 'en'
# Prepare messages for AI context
messages = [
{
"role": "system",
"content": f"You are an AI assistant providing a summary of a completed workflow. "
f"Please respond in '{userLanguage}' language. "
f"Summarize the workflow's activities, outcomes, and any important points. "
f"Be concise but informative. Use a professional but friendly tone."
},
{
"role": "user",
"content": f"Please provide a summary of this workflow:\n"
f"Status: {workflowSummary['status']}\n"
f"Total Messages: {workflowSummary['totalMessages']}\n"
f"Total Documents: {workflowSummary['totalDocuments']}\n"
f"Duration: {workflowSummary['duration']:.1f} seconds"
}
]
# Add relevant workflow messages for context
for msg in workflow.messages:
if msg.role == "user" or msg.status in ["first", "last"]:
messages.append({
"role": msg.role,
"content": msg.message
})
# Generate feedback using AI
feedback = await self.service.aiService.callApi(messages, temperature=0.7)
return feedback
except Exception as e:
logger.error(f"Error generating workflow feedback: {str(e)}")
return "Workflow completed successfully."