From 68d5a4aa208451a64d4b0e48f96fc614da2128b4 Mon Sep 17 00:00:00 2001
From: ValueOn AG
Date: Tue, 27 May 2025 14:16:52 +0200
Subject: [PATCH] refactored workflow with component playground and document
level exchange
---
modules/interfaces/serviceChatModel.py | 40 +-
modules/workflow/agentBase.py | 116 +++--
modules/workflow/agentManager.py | 271 ++++++++++
modules/workflow/documentManager.py | 174 +++++++
modules/workflow/taskManager.py | 215 ++++++++
modules/workflow/workflowManager.py | 652 +++++++++----------------
notes/changelog.txt | 33 --
7 files changed, 1014 insertions(+), 487 deletions(-)
create mode 100644 modules/workflow/agentManager.py
create mode 100644 modules/workflow/documentManager.py
create mode 100644 modules/workflow/taskManager.py
diff --git a/modules/interfaces/serviceChatModel.py b/modules/interfaces/serviceChatModel.py
index b220b6c7..6fcd9b89 100644
--- a/modules/interfaces/serviceChatModel.py
+++ b/modules/interfaces/serviceChatModel.py
@@ -36,6 +36,8 @@ class ChatStat(BaseModelWithUI):
tokenCount: Optional[int] = Field(None, description="Number of tokens processed")
bytesSent: Optional[int] = Field(None, description="Number of bytes sent")
bytesReceived: Optional[int] = Field(None, description="Number of bytes received")
+ successRate: Optional[float] = Field(None, description="Success rate of operations")
+ errorCount: Optional[int] = Field(None, description="Number of errors encountered")
class ChatLog(BaseModelWithUI):
"""Data model for a chat log"""
@@ -47,6 +49,7 @@ class ChatLog(BaseModelWithUI):
agentName: str = Field(description="Name of the agent")
status: str = Field(description="Status of the log entry")
progress: Optional[int] = Field(None, description="Progress percentage")
+ performance: Optional[Dict[str, Any]] = Field(None, description="Performance metrics")
class ChatMessage(BaseModelWithUI):
"""Data model for a chat message"""
@@ -62,6 +65,7 @@ class ChatMessage(BaseModelWithUI):
startedAt: str = Field(description="When the message processing started")
finishedAt: Optional[str] = Field(None, description="When the message processing finished")
stats: Optional[ChatStat] = Field(None, description="Statistics for this message")
+ success: Optional[bool] = Field(None, description="Whether the message processing was successful")
class ChatWorkflow(BaseModelWithUI):
"""Data model for a chat workflow"""
@@ -75,6 +79,7 @@ class ChatWorkflow(BaseModelWithUI):
logs: List[ChatLog] = Field(default_factory=list, description="Workflow logs")
messages: List[ChatMessage] = Field(default_factory=list, description="Messages in the workflow")
stats: Optional[ChatStat] = Field(None, description="Workflow statistics")
+ tasks: List['Task'] = Field(default_factory=list, description="List of tasks in the workflow")
label: Label = Field(
default=Label(default="Chat Workflow", translations={"en": "Chat Workflow", "fr": "Flux de travail de chat"}),
@@ -91,7 +96,8 @@ class ChatWorkflow(BaseModelWithUI):
"startedAt": Label(default="Started At", translations={"en": "Started At", "fr": "Démarré le"}),
"logs": Label(default="Logs", translations={"en": "Logs", "fr": "Journaux"}),
"messages": Label(default="Messages", translations={"en": "Messages", "fr": "Messages"}),
- "stats": Label(default="Statistics", translations={"en": "Statistics", "fr": "Statistiques"})
+ "stats": Label(default="Statistics", translations={"en": "Statistics", "fr": "Statistiques"}),
+ "tasks": Label(default="Tasks", translations={"en": "Tasks", "fr": "Tâches"})
}
# AGENT AND TASK MODELS
@@ -102,29 +108,41 @@ class Agent(BaseModelWithUI):
name: str = Field(description="Name of the agent")
description: str = Field(description="Description of the agent")
capabilities: List[str] = Field(default_factory=list, description="List of agent capabilities")
+ performance: Optional[Dict[str, Any]] = Field(None, description="Performance metrics")
class AgentResponse(BaseModelWithUI):
"""Data model for an agent response"""
- response: str = Field(description="Response content from the agent")
- documents: List[ChatDocument] = Field(default_factory=list, description="Documents associated with the response")
+ success: bool = Field(description="Whether the agent execution was successful")
+ message: ChatMessage = Field(description="Response message from the agent")
+ performance: Dict[str, Any] = Field(default_factory=dict, description="Performance metrics")
+ progress: float = Field(description="Task progress (0-100)")
-class TaskItem(BaseModelWithUI):
- """Data model for a task item"""
- sequenceNr: int = Field(description="Sequence number of the task")
+class Task(BaseModelWithUI):
+ """Data model for a task"""
+ id: str = Field(description="Primary key")
+ workflowId: str = Field(description="Foreign key to workflow")
agentName: str = Field(description="Name of the agent assigned to this task")
+ status: str = Field(description="Current status of the task")
+ progress: float = Field(description="Task progress (0-100)")
prompt: str = Field(description="Prompt for the task")
userLanguage: str = Field(description="User's preferred language")
- filesInput: List[str] = Field(default_factory=list, description="Input files (format: filename;[documentId])")
- filesOutput: List[str] = Field(default_factory=list, description="Output files (format: filename)")
+ filesInput: List[str] = Field(default_factory=list, description="Input files")
+ filesOutput: List[str] = Field(default_factory=list, description="Output files")
+ result: Optional[ChatMessage] = Field(None, description="Task result message")
+ error: Optional[str] = Field(None, description="Error message if failed")
+ startedAt: str = Field(description="When the task started")
+ finishedAt: Optional[str] = Field(None, description="When the task finished")
+ performance: Optional[Dict[str, Any]] = Field(None, description="Performance metrics")
class TaskPlan(BaseModelWithUI):
"""Data model for a task plan"""
- fileList: List[str] = Field(default_factory=list, description="List of files (format: filename)")
- taskItems: List[TaskItem] = Field(default_factory=list, description="List of task items in the plan")
+ fileList: List[str] = Field(default_factory=list, description="List of files")
+ tasks: List[Task] = Field(default_factory=list, description="List of tasks in the plan")
userLanguage: str = Field(description="User's preferred language")
userResponse: str = Field(description="User's response or feedback")
class UserInputRequest(BaseModelWithUI):
"""Data model for a user input request"""
prompt: str = Field(description="Prompt for the user")
- listFileId: List[int] = Field(default_factory=list, description="List of file IDs")
\ No newline at end of file
+ listFileId: List[int] = Field(default_factory=list, description="List of file IDs")
+ userLanguage: str = Field(description="User's preferred language")
\ No newline at end of file
diff --git a/modules/workflow/agentBase.py b/modules/workflow/agentBase.py
index a8bc33ae..0c79681a 100644
--- a/modules/workflow/agentBase.py
+++ b/modules/workflow/agentBase.py
@@ -7,10 +7,10 @@ Defines the standardized interface for task processing.
import os
import logging
import uuid
-from datetime import datetime
+from datetime import datetime, UTC
from typing import Dict, Any, List, Optional
from modules.shared.mimeUtils import isTextMimeType, determineContentEncoding
-from modules.interfaces.serviceChatModel import ChatContent
+from modules.interfaces.serviceChatModel import ChatContent, Task, AgentResponse, ChatMessage
logger = logging.getLogger(__name__)
@@ -67,45 +67,99 @@ class AgentBase:
"capabilities": self.capabilities
}
- async def processTask(self, task: Dict[str, Any]) -> Dict[str, Any]:
+ async def execute(self, task: Task) -> AgentResponse:
"""
- Process a standardized task structure and return results.
+ Execute a task and return the response.
This method must be implemented by all concrete agent classes.
Args:
- task: A dictionary containing:
- - taskId: Unique ID for this task
- - workflowId: ID of the parent workflow
- - prompt: The main instruction for the agent
- - inputDocuments: List of document objects to process
- - outputSpecifications: List of required output documents
- - context: Additional contextual information including:
- - workflow: The complete workflow object
- - workflowRound: Current workflow round
- - agentType: Type of agent
- - timestamp: Task timestamp
- - language: User language
-
+ task: Task object containing all necessary information
+
Returns:
- A dictionary containing:
- - feedback: Text response explaining what the agent did
- - documents: List of document objects created by the agent,
- each containing a "base64Encoded" flag in addition to "label" and "content"
+ AgentResponse object with execution results
"""
# Validate service manager
if not self.service:
logger.error("Service container not initialized")
- return {
- "feedback": "Error: Service container not initialized",
- "documents": []
- }
+ return AgentResponse(
+ success=False,
+ message=ChatMessage(
+ id=str(uuid.uuid4()),
+ workflowId=task.workflowId,
+ agentName=self.name,
+ message="Error: Service container not initialized",
+ role="system",
+ status="error",
+ sequenceNr=0,
+ startedAt=datetime.now(UTC).isoformat(),
+ finishedAt=datetime.now(UTC).isoformat(),
+ success=False
+ ),
+ performance={},
+ progress=0.0
+ )
+
+ try:
+ # Process the task using the concrete implementation
+ result = await self.processTask(task)
- # Base implementation - should be overridden by specialized agents
- logger.warning(f"Agent {self.name} is using the default implementation of processTask")
- return {
- "feedback": f"The processTask method was not implemented by agent '{self.name}'.",
- "documents": []
- }
+ # Create response message
+ message = ChatMessage(
+ id=str(uuid.uuid4()),
+ workflowId=task.workflowId,
+ agentName=self.name,
+ message=result.get("feedback", ""),
+ role="assistant",
+ status="completed",
+ sequenceNr=0,
+ startedAt=datetime.now(UTC).isoformat(),
+ finishedAt=datetime.now(UTC).isoformat(),
+ success=True
+ )
+
+ # Create response with performance metrics
+ return AgentResponse(
+ success=True,
+ message=message,
+ performance=result.get("performance", {}),
+ progress=result.get("progress", 100.0)
+ )
+
+ except Exception as e:
+ logger.error(f"Error processing task: {str(e)}", exc_info=True)
+ return AgentResponse(
+ success=False,
+ message=ChatMessage(
+ id=str(uuid.uuid4()),
+ workflowId=task.workflowId,
+ agentName=self.name,
+ message=f"Error processing task: {str(e)}",
+ role="system",
+ status="error",
+ sequenceNr=0,
+ startedAt=datetime.now(UTC).isoformat(),
+ finishedAt=datetime.now(UTC).isoformat(),
+ success=False
+ ),
+ performance={},
+ progress=0.0
+ )
+
+ async def processTask(self, task: Task) -> Dict[str, Any]:
+ """
+ Process a task and return the results.
+ This method must be implemented by all concrete agent classes.
+
+ Args:
+ task: Task object containing all necessary information
+
+ Returns:
+ Dictionary containing:
+ - feedback: Text response explaining what the agent did
+ - performance: Optional performance metrics
+ - progress: Task progress (0-100)
+ """
+ raise NotImplementedError("processTask must be implemented by concrete agent classes")
def determineBase64EncodingFlag(self, filename: str, content: Any, mimeType: str = None) -> bool:
"""
diff --git a/modules/workflow/agentManager.py b/modules/workflow/agentManager.py
new file mode 100644
index 00000000..0b97adca
--- /dev/null
+++ b/modules/workflow/agentManager.py
@@ -0,0 +1,271 @@
+"""
+Agent Manager Module for managing, initializing, and executing agents.
+"""
+
+import os
+import logging
+import importlib
+import asyncio
+from typing import Dict, Any, List, Optional, Tuple
+from datetime import datetime, UTC
+from modules.workflow.agentBase import AgentBase
+from modules.interfaces.serviceChatModel import AgentResponse, Task, ChatMessage
+import uuid
+from modules.workflow.taskManager import getTaskManager
+
+logger = logging.getLogger(__name__)
+
+class AgentManager:
+ """Central manager for all agents in the system, handling registration, initialization, and execution."""
+
+ _instance = None
+
+ @classmethod
+ def getInstance(cls):
+ """Return a singleton instance of the agent manager."""
+ if cls._instance is None:
+ cls._instance = cls()
+ return cls._instance
+
+ def __init__(self):
+ """Initialize the agent manager."""
+ if AgentManager._instance is not None:
+ raise RuntimeError("Singleton instance already exists - use getInstance()")
+
+ self.agents: Dict[str, AgentBase] = {}
+ self.service = None
+ self.taskManager = getTaskManager()
+ self._loadAgents()
+
+ def initialize(self, service=None):
+ """Initialize or update the manager with service references."""
+ if service:
+ # Validate required interfaces
+ required_interfaces = ['base', 'msft', 'google']
+ missing_interfaces = []
+ for interface in required_interfaces:
+ if not hasattr(service, interface):
+ missing_interfaces.append(interface)
+
+ if missing_interfaces:
+ logger.warning(f"Service container missing required interfaces: {', '.join(missing_interfaces)}")
+ return False
+
+ self.service = service
+
+ # Initialize agents with service
+ for agent in self.agents.values():
+ if service and hasattr(agent, 'setService'):
+ agent.setService(service)
+
+ return True
+
+ def _loadAgents(self):
+ """Load all available agents from modules."""
+ logger.info("Loading agent modules...")
+
+ # List of agent modules to load
+ agentModules = []
+ agentDir = os.path.join(os.path.dirname(os.path.dirname(__file__)), "agents")
+
+ # Search the directory for agent modules
+ for filename in os.listdir(agentDir):
+ if filename.startswith("agent") and filename.endswith(".py"):
+ agentModules.append(filename[0:-3]) # Remove .py extension
+
+ if not agentModules:
+ logger.warning("No agent modules found")
+ return
+
+ logger.info(f"{len(agentModules)} agent modules found")
+
+ # Load each agent module
+ for moduleName in agentModules:
+ try:
+ # Import the module
+ module = importlib.import_module(f"modules.agents.{moduleName}")
+
+ # Look for agent class or get_*_agent function
+ agentName = moduleName.split("agent")[-1]
+ className = f"Agent{agentName}"
+ getterName = f"getAgent{agentName}"
+
+ agent = None
+
+ # Try to get the agent via the get*Agent function
+ if hasattr(module, getterName):
+ getterFunc = getattr(module, getterName)
+ agent = getterFunc()
+ logger.info(f"Agent '{agent.name}' loaded via {getterName}()")
+
+ # Alternatively, try to instantiate the agent directly
+ elif hasattr(module, className):
+ agentClass = getattr(module, className)
+ agent = agentClass()
+ logger.info(f"Agent '{agent.name}' directly instantiated")
+
+ if agent:
+ # Register the agent
+ self.registerAgent(agent)
+ else:
+ logger.warning(f"No agent class or getter function found in module {moduleName}")
+
+ except ImportError as e:
+ logger.error(f"Module {moduleName} could not be imported: {e}")
+ except Exception as e:
+ logger.error(f"Error loading agent from module {moduleName}: {e}")
+
+ def registerAgent(self, agent: AgentBase):
+ """
+ Register an agent in the manager.
+
+ Args:
+ agent: The agent to register
+ """
+ agentId = getattr(agent, 'name', "unknown_agent")
+ self.agents[agentId] = agent
+ logger.debug(f"Agent '{agent.name}' registered")
+
+ def getAgent(self, agentIdentifier: str) -> Optional[AgentBase]:
+ """
+ Return an agent instance.
+
+ Args:
+ agentIdentifier: ID or type of the desired agent
+
+ Returns:
+ Agent instance or None if not found
+ """
+ if agentIdentifier in self.agents:
+ return self.agents[agentIdentifier]
+ logger.error(f"Agent with identifier '{agentIdentifier}' not found")
+ return None
+
+ def getAllAgents(self) -> Dict[str, AgentBase]:
+ """
+ Get all registered agents.
+
+ Returns:
+ Dictionary mapping agent names to agent instances
+ """
+ return self.agents.copy()
+
+ def getAgentInfos(self) -> List[Dict[str, Any]]:
+ """Return information about all registered agents."""
+ agentInfos = []
+ seenAgents = set()
+ for agent in self.agents.values():
+ if agent not in seenAgents:
+ agentInfos.append(agent.getAgentInfo())
+ seenAgents.add(agent)
+ return agentInfos
+
+ async def executeAgent(self, task: Task) -> Tuple[AgentResponse, Task]:
+ """
+ Execute an agent for a given task.
+
+ Args:
+ task: The task to execute
+
+ Returns:
+ Tuple of (AgentResponse, updated Task)
+ """
+ agent = self.getAgent(task.agentName)
+ if not agent:
+ error_msg = f"Agent '{task.agentName}' not found"
+ logger.error(error_msg)
+ return (
+ AgentResponse(
+ success=False,
+ message=ChatMessage(
+ id=str(uuid.uuid4()),
+ workflowId=task.workflowId,
+ agentName=task.agentName,
+ message=error_msg,
+ role="system",
+ status="error",
+ sequenceNr=0,
+ startedAt=datetime.now(UTC).isoformat(),
+ finishedAt=datetime.now(UTC).isoformat(),
+ success=False
+ ),
+ performance={},
+ progress=0.0
+ ),
+ Task(**{**task.model_dump(), "status": "failed", "error": error_msg})
+ )
+
+ try:
+ # Update task status
+ task = self.taskManager.updateTaskStatus(task, "running")
+ task.startedAt = datetime.now(UTC).isoformat()
+
+ # Execute agent
+ startTime = datetime.now(UTC)
+ response = await agent.execute(task)
+ endTime = datetime.now(UTC)
+
+ # Calculate performance metrics
+ duration = (endTime - startTime).total_seconds()
+ performance = {
+ "duration": duration,
+ "startTime": startTime.isoformat(),
+ "endTime": endTime.isoformat()
+ }
+
+ # Update task with result
+ task.status = "completed" if response.success else "failed"
+ task.finishedAt = endTime.isoformat()
+ task.result = response.message
+ task.progress = response.progress
+ task.performance = performance
+
+ if not response.success:
+ task.error = response.message.message if response.message else "Unknown error"
+
+ # Create response
+ response = AgentResponse(
+ success=response.success,
+ message=response.message,
+ performance=performance
+ )
+
+ # Update task status
+ if response.success:
+ task = self.taskManager.completeTask(task, response.message)
+ else:
+ task = self.taskManager.handleTaskError(task, response.message.message if response.message else "Unknown error")
+
+ return response, task
+
+ except Exception as e:
+ error_msg = f"Error executing agent '{task.agentName}': {str(e)}"
+ logger.error(error_msg, exc_info=True)
+
+ # Create error response
+ error_response = AgentResponse(
+ success=False,
+ message=ChatMessage(
+ id=str(uuid.uuid4()),
+ workflowId=task.workflowId,
+ agentName=task.agentName,
+ message=error_msg,
+ role="system",
+ status="error",
+ sequenceNr=0,
+ startedAt=datetime.now(UTC).isoformat(),
+ finishedAt=datetime.now(UTC).isoformat(),
+ success=False
+ ),
+ performance={},
+ progress=0.0
+ )
+
+ # Update task with error
+ task = self.taskManager.handleTaskError(task, error_msg)
+
+ return error_response, task
+
+# Singleton factory for the agent manager
+def getAgentManager():
+ return AgentManager.getInstance()
\ No newline at end of file
diff --git a/modules/workflow/documentManager.py b/modules/workflow/documentManager.py
new file mode 100644
index 00000000..484daf5e
--- /dev/null
+++ b/modules/workflow/documentManager.py
@@ -0,0 +1,174 @@
+"""
+Document Manager Module for handling document operations and content extraction.
+"""
+
+import logging
+from typing import Dict, Any, List, Optional
+from datetime import datetime
+from modules.interfaces.serviceChatModel import ChatDocument, ChatContent
+from modules.workflow.documentProcessor import getDocumentContents
+
+logger = logging.getLogger(__name__)
+
+class DocumentManager:
+ """Manager for document operations and content extraction."""
+
+ _instance = None
+
+ @classmethod
+ def getInstance(cls):
+ """Return a singleton instance of the document manager."""
+ if cls._instance is None:
+ cls._instance = cls()
+ return cls._instance
+
+ def __init__(self):
+ """Initialize the document manager."""
+ if DocumentManager._instance is not None:
+ raise RuntimeError("Singleton instance already exists - use getInstance()")
+
+ self.service = None
+
+ def initialize(self, service=None):
+ """Initialize or update the manager with service references."""
+ if service:
+ # Validate required interfaces
+ required_interfaces = ['base', 'msft', 'google']
+ missing_interfaces = []
+ for interface in required_interfaces:
+ if not hasattr(service, interface):
+ missing_interfaces.append(interface)
+
+ if missing_interfaces:
+ logger.warning(f"Service container missing required interfaces: {', '.join(missing_interfaces)}")
+ return False
+
+ self.service = service
+ return True
+
+ async def extractContent(self, fileId: int) -> Optional[ChatDocument]:
+ """
+ Extract content from a file.
+
+ Args:
+ fileId: ID of the file to process
+
+ Returns:
+ ChatDocument object with extracted content or None if processing failed
+ """
+ try:
+ # Get file metadata and content from service
+ fileMetadata = await self.service.base.getFileMetadata(fileId)
+ fileContent = await self.service.base.getFileContent(fileId)
+
+ if not fileMetadata or not fileContent:
+ logger.error(f"Could not retrieve file data for fileId {fileId}")
+ return None
+
+ # Extract content using documentProcessor
+ contents = getDocumentContents(fileMetadata, fileContent)
+
+ # Create ChatDocument
+ return ChatDocument(
+ id=str(fileId), # Using fileId as document id
+ fileId=fileId,
+ filename=fileMetadata.get("name", "unknown"),
+ fileSize=fileMetadata.get("size", 0),
+ mimeType=fileMetadata.get("mimeType", "application/octet-stream"),
+ contents=contents
+ )
+
+ except Exception as e:
+ logger.error(f"Error extracting content from file {fileId}: {str(e)}", exc_info=True)
+ return None
+
+ async def processFileIds(self, fileIds: List[int]) -> List[ChatDocument]:
+ """
+ Process multiple files and extract their contents.
+
+ Args:
+ fileIds: List of file IDs to process
+
+ Returns:
+ List of ChatDocument objects
+ """
+ documents = []
+ for fileId in fileIds:
+ try:
+ document = await self.extractContent(fileId)
+ if document:
+ documents.append(document)
+ except Exception as e:
+ logger.error(f"Error processing file {fileId}: {str(e)}")
+ continue
+ return documents
+
+ async def getFileContent(self, fileId: int) -> Optional[bytes]:
+ """
+ Get raw file content.
+
+ Args:
+ fileId: ID of the file
+
+ Returns:
+ File content as bytes or None if not found
+ """
+ try:
+ return await self.service.base.getFileContent(fileId)
+ except Exception as e:
+ logger.error(f"Error getting file content for {fileId}: {str(e)}")
+ return None
+
+ async def getFileMetadata(self, fileId: int) -> Optional[Dict[str, Any]]:
+ """
+ Get file metadata.
+
+ Args:
+ fileId: ID of the file
+
+ Returns:
+ File metadata dictionary or None if not found
+ """
+ try:
+ return await self.service.base.getFileMetadata(fileId)
+ except Exception as e:
+ logger.error(f"Error getting file metadata for {fileId}: {str(e)}")
+ return None
+
+ async def saveFile(self, filename: str, content: bytes, mimeType: str) -> Optional[int]:
+ """
+ Save a new file.
+
+ Args:
+ filename: Name of the file
+ content: File content as bytes
+ mimeType: MIME type of the file
+
+ Returns:
+ File ID if successful, None otherwise
+ """
+ try:
+ return await self.service.base.saveFile(filename, content, mimeType)
+ except Exception as e:
+ logger.error(f"Error saving file {filename}: {str(e)}")
+ return None
+
+ async def deleteFile(self, fileId: int) -> bool:
+ """
+ Delete a file.
+
+ Args:
+ fileId: ID of the file to delete
+
+ Returns:
+ True if successful, False otherwise
+ """
+ try:
+ return await self.service.base.deleteFile(fileId)
+ except Exception as e:
+ logger.error(f"Error deleting file {fileId}: {str(e)}")
+ return False
+
+# Singleton factory for the document manager
+def getDocumentManager():
+ return DocumentManager.getInstance()
\ No newline at end of file
diff --git a/modules/workflow/taskManager.py b/modules/workflow/taskManager.py
new file mode 100644
index 00000000..63a4e08c
--- /dev/null
+++ b/modules/workflow/taskManager.py
@@ -0,0 +1,215 @@
+"""
+Task Manager Module for managing task states and transitions.
+"""
+
+import logging
+from typing import Dict, Any, List, Optional
+from datetime import datetime, UTC
+import uuid
+from modules.interfaces.serviceChatModel import Task, ChatLog, ChatMessage
+
+logger = logging.getLogger(__name__)
+
+class TaskManager:
+ """Manager for task state management and transitions."""
+
+ _instance = None
+
+ @classmethod
+ def getInstance(cls):
+ """Return a singleton instance of the task manager."""
+ if cls._instance is None:
+ cls._instance = cls()
+ return cls._instance
+
+ def __init__(self):
+ """Initialize the task manager."""
+ if TaskManager._instance is not None:
+ raise RuntimeError("Singleton instance already exists - use getInstance()")
+
+ self.service = None
+
+ def initialize(self, service=None):
+ """Initialize or update the manager with service references."""
+ if service:
+ # Validate required interfaces
+ required_interfaces = ['base', 'msft', 'google']
+ missing_interfaces = []
+ for interface in required_interfaces:
+ if not hasattr(service, interface):
+ missing_interfaces.append(interface)
+
+ if missing_interfaces:
+ logger.warning(f"Service container missing required interfaces: {', '.join(missing_interfaces)}")
+ return False
+
+ self.service = service
+ return True
+
+ def createTask(self, workflowId: str, agentName: str, prompt: str, userLanguage: str,
+ filesInput: List[str] = None, filesOutput: List[str] = None) -> Task:
+ """
+ Create a new task.
+
+ Args:
+ workflowId: ID of the workflow this task belongs to
+ agentName: Name of the agent to execute the task
+ prompt: Task prompt
+ userLanguage: User's preferred language
+ filesInput: List of input files
+ filesOutput: List of output files
+
+ Returns:
+ New Task object
+ """
+ return Task(
+ id=str(uuid.uuid4()),
+ workflowId=workflowId,
+ agentName=agentName,
+ status="pending",
+ progress=0.0,
+ prompt=prompt,
+ userLanguage=userLanguage,
+ filesInput=filesInput or [],
+ filesOutput=filesOutput or [],
+ startedAt=datetime.now(UTC).isoformat()
+ )
+
+ def updateTaskStatus(self, task: Task, newStatus: str, progress: float = None,
+ error: str = None, result: ChatMessage = None) -> Task:
+ """
+ Update task status and related fields.
+
+ Args:
+ task: Task to update
+ newStatus: New status value
+ progress: Optional progress value
+ error: Optional error message
+ result: Optional result message
+
+ Returns:
+ Updated Task object
+ """
+ # Validate status transition
+ valid_transitions = {
+ "pending": ["running", "failed"],
+ "running": ["completed", "failed"],
+ "completed": [],
+ "failed": []
+ }
+
+ if newStatus not in valid_transitions.get(task.status, []):
+ logger.warning(f"Invalid status transition from {task.status} to {newStatus}")
+ return task
+
+ # Update task fields
+ task.status = newStatus
+ if progress is not None:
+ task.progress = progress
+ if error is not None:
+ task.error = error
+ if result is not None:
+ task.result = result
+
+ # Update timestamps
+ if newStatus in ["completed", "failed"]:
+ task.finishedAt = datetime.now(UTC).isoformat()
+
+ return task
+
+ def createTaskLog(self, task: Task, message: str, logType: str = "info") -> ChatLog:
+ """
+ Create a log entry for a task.
+
+ Args:
+ task: Task to create log for
+ message: Log message
+ logType: Type of log entry
+
+ Returns:
+ New ChatLog object
+ """
+ return ChatLog(
+ id=str(uuid.uuid4()),
+ workflowId=task.workflowId,
+ message=message,
+ type=logType,
+ timestamp=datetime.now(UTC).isoformat(),
+ agentName=task.agentName,
+ status=task.status,
+ progress=task.progress
+ )
+
+ def updateTaskProgress(self, task: Task, progress: float, message: str = None) -> Task:
+ """
+ Update task progress and optionally create a log entry.
+
+ Args:
+ task: Task to update
+ progress: New progress value (0-100)
+ message: Optional progress message
+
+ Returns:
+ Updated Task object
+ """
+ # Validate progress value
+ if not 0 <= progress <= 100:
+ logger.warning(f"Invalid progress value: {progress}")
+ return task
+
+ # Update progress
+ task.progress = progress
+
+ # Create log entry if message provided
+ if message:
+ log = self.createTaskLog(task, message, "progress")
+ if self.service and hasattr(self.service, 'logAdd'):
+ self.service.logAdd(log)
+
+ return task
+
+ def handleTaskError(self, task: Task, error: str) -> Task:
+ """
+ Handle task error and update task state.
+
+ Args:
+ task: Task to update
+ error: Error message
+
+ Returns:
+ Updated Task object
+ """
+ # Update task status
+ task = self.updateTaskStatus(task, "failed", error=error)
+
+ # Create error log
+ log = self.createTaskLog(task, f"Task failed: {error}", "error")
+ if self.service and hasattr(self.service, 'logAdd'):
+ self.service.logAdd(log)
+
+ return task
+
+ def completeTask(self, task: Task, result: ChatMessage) -> Task:
+ """
+ Mark task as completed and set result.
+
+ Args:
+ task: Task to complete
+ result: Result message
+
+ Returns:
+ Updated Task object
+ """
+ # Update task status
+ task = self.updateTaskStatus(task, "completed", progress=100.0, result=result)
+
+ # Create completion log
+ log = self.createTaskLog(task, "Task completed successfully", "info")
+ if self.service and hasattr(self.service, 'logAdd'):
+ self.service.logAdd(log)
+
+ return task
+
+# Singleton factory for the task manager
+def getTaskManager():
+ return TaskManager.getInstance()
\ No newline at end of file
diff --git a/modules/workflow/workflowManager.py b/modules/workflow/workflowManager.py
index 005815c6..28294960 100644
--- a/modules/workflow/workflowManager.py
+++ b/modules/workflow/workflowManager.py
@@ -9,17 +9,18 @@ import logging
import json
import uuid
import base64
-from datetime import datetime, timedelta
-from typing import Dict, Any, List, Optional, Union, Tuple
+from datetime import datetime, UTC, timedelta
+from typing import Dict, Any, List, Optional, Union, Tuple, Callable, TypedDict, Protocol
import time
from modules.shared.mimeUtils import isTextMimeType
# Required imports
-from modules.workflow.agentRegistry import getAgentRegistry
-from modules.workflow.documentProcessor import getDocumentContents
+from modules.workflow.agentManager import getAgentManager
+from modules.workflow.taskManager import getTaskManager
+from modules.workflow.documentManager import getDocumentManager
from modules.interfaces.serviceChatModel import (
UserInputRequest, ChatWorkflow, ChatMessage, ChatLog,
- ChatDocument, ChatStat, Workflow
+ ChatDocument, ChatStat, Workflow, Task, AgentResponse
)
# Configure logger
@@ -42,16 +43,165 @@ class WorkflowStoppedException(Exception):
"""Exception raised when a workflow is forcibly stopped with function checkExitCriteria() """
pass
+class ServiceObject:
+ """Service object structure available to agents."""
+ def __init__(self):
+ self.user: Dict[str, Any] = {} # User context
+ self.operator: Dict[str, Callable] = {} # Document operations
+ self.workflow: Dict[str, Any] = {} # Workflow context
+ self.functions: Any = None # Core functions
+ self.logAdd: Callable = None # Logging function
+
class WorkflowManager:
"""Manages the execution of workflows and their associated agents."""
- def __init__(self, service):
+ def __init__(self, service: ServiceObject):
"""Initialize the workflow manager with service container."""
# Store service container
self.service = service
self.service.logAdd = self.logAdd
- self.agentRegistry = getAgentRegistry()
- self.agentRegistry.initialize(service=self.service)
+
+ # Initialize managers
+ self.agentManager = getAgentManager()
+ self.taskManager = getTaskManager()
+ self.documentManager = getDocumentManager()
+
+ # Initialize managers with service
+ self.agentManager.initialize(service=self.service)
+ self.documentManager.initialize(service=self.service)
+
+ # Add agent service functionality directly to service object
+ service.user = {
+ 'attributes': service.user.get('attributes', {}),
+ 'connection': service.user.get('connection', [])
+ }
+
+ # Add operator functions
+ service.operator = {
+ 'forEach': lambda items, func: [func(item) for item in items],
+ 'aiCall': service.functions.callAi,
+ 'extract': lambda file: self.documentManager.extractContent(file),
+ 'fileRefToFileId': lambda ref: self.documentManager.convertFileRefToId(ref),
+ 'fileIdToFileRef': lambda fileId: self.documentManager.convertFileIdToRef(fileId),
+ 'convert': lambda data, format: self.documentManager.convertDataFormat(data, format),
+ 'createAgentInputFiles': lambda files: self.documentManager.createAgentInputFileList(files),
+ 'saveAgentOutputFiles': lambda files: self.documentManager.saveAgentOutputFiles(files)
+ }
+
+ # Add workflow context
+ service.workflow = {
+ 'activeTask': {
+ 'id': None,
+ 'progress': 0,
+ 'status': 'pending'
+ },
+ 'tasks': []
+ }
+
+ def _extractFileContent(self, file):
+ """Extract content from a file for agent processing."""
+ try:
+ fileData = self.service.functions.getFileData(file['id'])
+ if fileData is None:
+ return None
+
+ # Handle base64 encoded content
+ if file.get('base64Encoded', False):
+ import base64
+ return base64.b64decode(fileData)
+
+ # Handle text content
+ if isinstance(fileData, bytes):
+ return fileData.decode('utf-8')
+ return fileData
+
+ except Exception as e:
+ logger.error(f"Error extracting file content: {str(e)}")
+ return None
+
+ def _convertFileRefToId(self, ref):
+ """Convert agent file reference to file ID."""
+ try:
+ # Extract file ID from reference format
+ if isinstance(ref, str) and ';' in ref:
+ return int(ref.split(';')[1])
+ return int(ref)
+ except Exception as e:
+ logger.error(f"Error converting file reference to ID: {str(e)}")
+ return None
+
+ def _convertFileIdToRef(self, fileId):
+ """Convert file ID to agent file reference."""
+ try:
+ file = self.service.functions.getFile(fileId)
+ if not file:
+ return None
+ return f"{file['name']};{fileId}"
+ except Exception as e:
+ logger.error(f"Error converting file ID to reference: {str(e)}")
+ return None
+
+ def _convertDataFormat(self, data, format):
+ """Convert data between different formats."""
+ try:
+ if format == 'json':
+ if isinstance(data, str):
+ return json.loads(data)
+ return json.dumps(data)
+ elif format == 'base64':
+ import base64
+ if isinstance(data, str):
+ return base64.b64encode(data.encode('utf-8')).decode('utf-8')
+ return base64.b64encode(data).decode('utf-8')
+ return data
+ except Exception as e:
+ logger.error(f"Error converting data format: {str(e)}")
+ return data
+
+ def _createAgentInputFileList(self, files):
+ """Create a list of input files for agent processing."""
+ try:
+ inputFiles = []
+ for file in files:
+ fileId = self._convertFileRefToId(file)
+ if fileId:
+ fileData = self.service.functions.getFile(fileId)
+ if fileData:
+ inputFiles.append({
+ 'id': fileId,
+ 'name': fileData['name'],
+ 'mimeType': fileData['mimeType'],
+ 'content': self._extractFileContent(fileData)
+ })
+ return inputFiles
+ except Exception as e:
+ logger.error(f"Error creating agent input file list: {str(e)}")
+ return []
+
+ def _saveAgentOutputFiles(self, files):
+ """Save output files from agent processing."""
+ try:
+ savedFiles = []
+ for file in files:
+ # Create file metadata
+ fileMeta = self.service.functions.createFile(
+ name=file['name'],
+ mimeType=file.get('mimeType', 'application/octet-stream'),
+ size=len(file['content'])
+ )
+
+ if fileMeta and 'id' in fileMeta:
+ # Save file content
+ if self.service.functions.createFileData(fileMeta['id'], file['content']):
+ savedFiles.append({
+ 'id': fileMeta['id'],
+ 'name': file['name'],
+ 'mimeType': file.get('mimeType', 'application/octet-stream')
+ })
+ return savedFiles
+ except Exception as e:
+ logger.error(f"Error saving agent output files: {str(e)}")
+ return []
async def workflowStart(self, userInput: UserInputRequest, workflowId: Optional[str] = None) -> ChatWorkflow:
"""Starts a new workflow or continues an existing one."""
@@ -78,7 +228,7 @@ class WorkflowManager:
# Raise an exception to stop execution
raise WorkflowStoppedException(f"Workflow execution stopped due to status: {current_workflow['status']}")
- async def workflowProcess(self, userInput: Dict[str, Any], workflow: ChatWorkflow) -> ChatWorkflow:
+ async def workflowProcess(self, userInput: UserInputRequest, workflow: ChatWorkflow) -> ChatWorkflow:
"""
Main processing function that implements the workflow state machine.
Handles the complete workflow process from user input to final response.
@@ -94,7 +244,10 @@ class WorkflowManager:
try:
# State 3: User Message Processing
self.checkExitCriteria(workflow)
- messageUser = await self.chatMessageToWorkflow("user", None, userInput, workflow)
+ messageUser = await self.chatMessageToWorkflow("user", None, {
+ "prompt": userInput.prompt,
+ "listFileId": userInput.listFileId
+ }, workflow)
messageUser.status = "first" # For first message
# State 4: Project Manager Analysis
@@ -108,17 +261,24 @@ class WorkflowManager:
# Get detected language and set it in the serviceBase interface
self.checkExitCriteria(workflow)
userLanguage = projectManagerResponse.get("userLanguage", "en")
+ workflow.userLanguage = userLanguage
self.service.functions.setUserLanguage(userLanguage)
# Save the response as a message in the workflow and add log entries
self.checkExitCriteria(workflow)
responseMessage = ChatMessage(
- role="assistant",
+ id=str(uuid.uuid4()),
+ workflowId=workflow.id,
agentName="Project Manager",
message=objUserResponse,
- status="step" # As per state machine specification
+ role="assistant",
+ status="step",
+ sequenceNr=len(workflow.messages) + 1,
+ startedAt=datetime.now(UTC).isoformat(),
+ finishedAt=datetime.now(UTC).isoformat(),
+ success=True
)
- self.messageAdd(workflow, responseMessage)
+ workflow.messages.append(responseMessage)
# Add detailed log entry about the task plan
taskPlanLog = "Input: "
@@ -186,7 +346,7 @@ class WorkflowManager:
self.logAdd(workflow, "Creating final response", level="info", progress=90)
finalMessage = await self.generateFinalMessage(objUserResponse, objFinalDocuments, objResults)
finalMessage.status = "last" # As per state machine specification
- self.messageAdd(workflow, finalMessage)
+ workflow.messages.append(finalMessage)
# State 7: Workflow Completion
self.checkExitCriteria(workflow)
@@ -196,13 +356,21 @@ class WorkflowManager:
endTime = time.time()
workflow.stats.processingTime = endTime - startTime
+ # Update workflow in database
+ self.service.functions.updateWorkflow(workflow.id, {
+ "status": workflow.status,
+ "lastActivity": workflow.lastActivity,
+ "stats": workflow.stats.model_dump(),
+ "messages": [msg.model_dump() for msg in workflow.messages]
+ })
+
return workflow
except Exception as e:
# State 2: Workflow Exception
logger.error(f"Workflow processing error: {str(e)}", exc_info=True)
workflow.status = "failed"
- workflow.lastActivity = datetime.now().isoformat()
+ workflow.lastActivity = datetime.now(UTC).isoformat()
# Update processing time even on error
endTime = time.time()
@@ -458,7 +626,7 @@ JSON_OUTPUT = {{
async def agentProcessing(self, task: Dict[str, Any], workflow: ChatWorkflow) -> List[Dict[str, Any]]:
"""
Process a single agent task from the workflow (State 5: Agent Execution).
- Optimized for the task-based approach where all agents implement processTask.
+ Uses the new Task and AgentResponse models.
Args:
task: The task definition containing agent name, prompt, and document specifications
@@ -467,139 +635,53 @@ JSON_OUTPUT = {{
Returns:
List of document objects created by the agent
"""
- # 1. Extract task information
- agentName = task.get("agent")
- agentPrompt = task.get("prompt", "")
-
- # Get agent from registry
- agent = self.agentRegistry.getAgent(agentName)
- if not agent:
- logger.error(f"Agent '{agentName}' not found")
- return []
- agentLabel = agent.label
-
- # Set workflow manager reference on the agent
- agent.workflowManager = self
-
- # Log the current step
- outputLabels = []
- for doc in task.get("outputDocuments", []):
- outputLabels.append(doc.get("label", "unknown"))
-
- stepInfo = f"Agent {agentLabel} to create {', '.join(outputLabels)}."
- self.logAdd(workflow, stepInfo, level="info")
-
- # Check if prompt is empty
- if agentPrompt == "":
- logger.warning("Empty prompt, no task to do")
- return []
-
- # Prepare output document specifications
- outputSpecs = []
- for doc in task.get("outputDocuments", []):
- outputSpec = {
- "label": doc.get("label"),
- "description": doc.get("prompt", "")
- }
- outputSpecs.append(outputSpec)
-
- # Prepare input documents for the agent
- inputDocuments = await self.prepareAgentInputDocuments(task.get('inputDocuments', []), workflow)
-
- # Create a standardized task object for the agent as per state machine spec
- agentTask = {
- "taskId": str(uuid.uuid4()),
- "workflowId": workflow.id,
- "prompt": agentPrompt,
- "inputDocuments": inputDocuments,
- "outputSpecifications": outputSpecs,
- "context": {
- "workflow": workflow, # Add the complete workflow object
- "workflowRound": workflow.currentRound,
- "agentType": agentName,
- "timestamp": datetime.now().isoformat(),
- "language": self.service.functions.userLanguage # Pass language to agent
- }
- }
-
- # Execute the agent with the standardized task
try:
- # Process the task using the agent's standardized interface
- logger.debug("TASK: "+self.parseJson2text(agentTask))
+ # Create Task object
+ task_obj = Task(
+ id=str(uuid.uuid4()),
+ workflowId=workflow.id,
+ agentName=task.get("agent"),
+ status="pending",
+ progress=0.0,
+ prompt=task.get("prompt", ""),
+ filesInput=task.get("inputDocuments", []),
+ filesOutput=task.get("outputDocuments", []),
+ userLanguage=workflow.userLanguage
+ )
- # Ensure AI service is available
- if not self.service.functions.aiService:
- logger.error("AI service not available in LucyDOM interface")
- self.logAdd(workflow, "Error: AI service not available", level="error")
- return []
-
- # Calculate bytes sent before processing
- bytesSent = len(json.dumps(agentTask).encode('utf-8'))
- for doc in inputDocuments:
- if doc.get('data'):
- bytesSent += len(doc['data'].encode('utf-8'))
- for content in doc.get('contents', []):
- if content.get('data'):
- bytesSent += len(content['data'].encode('utf-8'))
-
- # Process the task
- startTime = time.time()
- agentResults = await agent.processTask(agentTask)
- endTime = time.time()
+ # Execute agent
+ response, updated_task = await self.agentManager.executeAgent(task_obj)
- # Calculate bytes received
- bytesReceived = len(json.dumps(agentResults).encode('utf-8'))
- for doc in agentResults.get('documents', []):
- if doc.get('content'):
- bytesReceived += len(doc['content'].encode('utf-8'))
-
- # Calculate tokens used (now using bytes)
- tokensUsed = bytesSent + bytesReceived
+ # Update workflow stats
+ if response.performance:
+ workflow.stats.tokensUsed += response.performance.get("tokensUsed", 0)
+ workflow.stats.bytesSent += response.performance.get("bytesSent", 0)
+ workflow.stats.bytesReceived += response.performance.get("bytesReceived", 0)
- # Update workflow statistics
- if 'stats' not in workflow:
- workflow.stats = ChatStat(
- bytesSent=0,
- bytesReceived=0,
- tokensUsed=0,
- processingTime=0
- )
-
- workflow.stats.bytesSent += bytesSent
- workflow.stats.bytesReceived += bytesReceived
- workflow.stats.tokensUsed += tokensUsed
- workflow.stats.processingTime += (endTime - startTime)
-
# Update in database
self.service.functions.updateWorkflow(workflow.id, {
"stats": workflow.stats.model_dump()
})
-
- logger.debug(f"Agent '{agentName}' completed task. RESULT: {self.parseJson2text(agentResults)}")
-
+
# Log the agent response
self.logAdd(
workflow,
- f"Agent {agentLabel} completed task. Feedback: {agentResults.get('feedback', 'No feedback provided')}",
+ f"Agent {task.get('agent')} completed task. Feedback: {response.message.message if response.message else 'No feedback provided'}",
level="info"
)
- # Store produced files and prepare input object for message
- agentInputs = {
- "prompt": agentResults.get("feedback", ""),
- "listFileId": self.saveAgentDocuments(agentResults)
- }
-
# Create a message in the workflow with the agent's response
- agentMessage = await self.chatMessageToWorkflow("assistant", agent, agentInputs, workflow)
+ agentMessage = await self.chatMessageToWorkflow("assistant", task.get("agent"), {
+ "prompt": response.message.message if response.message else "",
+ "listFileId": response.message.documents if response.message else []
+ }, workflow)
agentMessage.status = "step" # As per state machine specification
- logger.debug(f"Agent result = {self.parseJson2text(agentMessage)}.")
return agentMessage.documents
except Exception as e:
- errorMsg = f"Error executing agent '{agentLabel}': {str(e)}"
- logger.error(errorMsg, exc_info=True) # Add exc_info=True to get full traceback
+ errorMsg = f"Error executing agent '{task.get('agent')}': {str(e)}"
+ logger.error(errorMsg, exc_info=True)
self.logAdd(workflow, errorMsg, level="error")
return []
@@ -722,24 +804,25 @@ filesDelivered = {self.parseJson2text(matchingDocuments)}
return f"[{role} {agentName}]: {contentSummary}{docsSummary}"
- async def chatMessageToWorkflow(self, role: str, agent: Dict[str, Any], chatMessage: Dict[str, Any], workflow: ChatWorkflow) -> ChatMessage:
+ async def chatMessageToWorkflow(self, role: str, agent: Union[str, Dict[str, Any]], chatMessage: Dict[str, Any], workflow: ChatWorkflow) -> ChatMessage:
"""
- Integrates user inputs into a Message object including files with complete contents (State 3: User Message Processing).
+ Integrates user inputs into a Message object including files with complete contents.
+ Uses DocumentManager for file processing.
Args:
role: Role of the message sender ('user' or 'assistant')
- agentName: Name of the agent, if message is from an agent
+ agent: Agent name or object
chatMessage: Input data with "prompt"=str, "listFileId"=[]
workflow: Current workflow object
Returns:
Message object with content and documents including contents
"""
- agentName = "" if agent is None else agent.name
- agentLabel = "" if agent is None else agent.label
+ agentName = agent if isinstance(agent, str) else agent.name if agent else ""
+ agentLabel = agent.label if hasattr(agent, 'label') else agentName
+
logger.info(f"Message from {role} {agentName} sent with {len(chatMessage.get('listFileId', []))} documents")
- logger.debug(f"message = {self.parseJson2text(chatMessage)}.")
-
+
# Check message content
messageContent = chatMessage.get("prompt", "")
if isinstance(messageContent, dict) and "content" in messageContent:
@@ -752,288 +835,33 @@ filesDelivered = {self.parseJson2text(matchingDocuments)}
# Process additional files with complete contents
additionalFileIds = chatMessage.get("listFileId", [])
- additionalFiles = await self.processFileIds(additionalFileIds)
+ additionalFiles = await self.documentManager.processFileIds(additionalFileIds)
# Create message object
messageObject = ChatMessage(
- role=role,
+ id=str(uuid.uuid4()),
+ workflowId=workflow.id,
agentName=agentLabel,
- content=messageContent,
- documents=additionalFiles,
- status=chatMessage.get("status", "step")
+ message=messageContent,
+ role=role,
+ status=chatMessage.get("status", "step"),
+ sequenceNr=len(workflow.messages) + 1,
+ startedAt=datetime.now(UTC).isoformat(),
+ finishedAt=datetime.now(UTC).isoformat(),
+ success=True,
+ documents=additionalFiles
)
-
- messageObject = self.messageAdd(workflow, messageObject)
- logger.debug(f"message_user = {self.parseJson2text(messageObject)}.")
-
- # Update statistics for user input
- if role == "user":
- # Calculate bytes sent
- bytesSent = len(messageContent.encode('utf-8'))
- for doc in additionalFiles:
- if doc.get('data'):
- bytesSent += len(doc['data'].encode('utf-8'))
- for content in doc.get('contents', []):
- if content.get('data'):
- bytesSent += len(content['data'].encode('utf-8'))
-
- # Calculate tokens used (now using bytes)
- tokensUsed = bytesSent
-
- # Update workflow statistics
- if 'stats' not in workflow:
- workflow.stats = ChatStat(
- bytesSent=0,
- bytesReceived=0,
- tokensUsed=0,
- processingTime=0
- )
-
- workflow.stats.bytesSent += bytesSent
- workflow.stats.tokensUsed += tokensUsed
-
- # Update in database
- self.service.functions.updateWorkflow(workflow.id, {
- "stats": workflow.stats.model_dump()
- })
-
+
+ # Add message to workflow
+ workflow.messages.append(messageObject)
+
+ # Update workflow in database
+ self.service.functions.updateWorkflow(workflow.id, {
+ "messages": [msg.model_dump() for msg in workflow.messages]
+ })
+
return messageObject
- async def processFileIds(self, fileIds: List[int]) -> List[Dict[str, Any]]:
- """
- Processes a list of File-IDs and returns the corresponding file objects as a list of Document objects.
- Loads all contents directly and adds summaries to each content item.
- Now properly handles the base64Encoded flag.
-
- Args:
- fileIds: List of file IDs
-
- Returns:
- List of Document objects with contents, summaries, and base64Encoded flags
- """
- documents = []
- logger.info(f"Processing {len(fileIds)} files")
-
- for fileId in fileIds:
- try:
- # Check if the file exists
- file = self.service.functions.getFile(fileId)
- if not file:
- logger.warning(f"File with ID {fileId} not found")
- continue
-
- # Check if file belongs to the current mandate
- if file.get("mandateId") != self.functions.mandateId:
- logger.warning(f"File {fileId} does not belong to mandate {self.functions.mandateId}")
- continue
-
- # Load file content
- fileContent = self.service.functions.getFileData(fileId)
- if fileContent is None:
- logger.warning(f"No content found for file with ID {fileId}")
- continue
-
- # Determine if file is text or binary based on MIME type
- mimeType = file.get("mimeType", "application/octet-stream")
- isTextFormat = isTextMimeType(mimeType)
-
- # Get file data from database
- fileDataEntries = self.service.functions.db.getRecordset("fileData", recordFilter={"id": fileId})
- base64Encoded = False
-
- if fileDataEntries and "base64Encoded" in fileDataEntries[0]:
- # Use the flag from the database
- base64Encoded = fileDataEntries[0]["base64Encoded"]
- else:
- # Determine based on file type (fallback for older data)
- base64Encoded = not isTextFormat
-
- # Convert to base64 for document storage
- encodedData = ""
-
- if base64Encoded:
- # Already base64 encoded in database
- encodedData = base64.b64encode(fileContent).decode('utf-8')
- else:
- # Text file - convert to string if it's bytes
- if isinstance(fileContent, bytes):
- try:
- fileContentStr = fileContent.decode('utf-8')
- encodedData = fileContentStr
- except UnicodeDecodeError:
- # Failed to decode as text, use base64
- encodedData = base64.b64encode(fileContent).decode('utf-8')
- base64Encoded = True
- else:
- # Already a string
- encodedData = fileContent
-
- # Create document
- fileNameExt = file.get("name")
- document = ChatDocument(
- id=f"doc_{str(uuid.uuid4())}",
- fileId=fileId,
- name=os.path.splitext(fileNameExt)[0] if os.path.splitext(fileNameExt)[0] else "noname",
- ext=os.path.splitext(fileNameExt)[1][1:] if os.path.splitext(fileNameExt)[1] else "bin",
- mimeType=mimeType,
- data=encodedData,
- base64Encoded=base64Encoded,
- metadata={
- "isText": isTextFormat,
- "base64Encoded": base64Encoded # For backward compatibility
- },
- contents=[]
- )
-
- # Extract contents
- contents = getDocumentContents(file, fileContent)
-
- # Add summaries to each content item
- for content in contents:
- content["summary"] = await self.getContentExtraction(content)
-
- # Ensure base64Encoded flag is set
- if "base64Encoded" not in content:
- # Use the flag from metadata if available
- content["base64Encoded"] = content.get("metadata", {}).get("base64Encoded", not content.get("metadata", {}).get("isText", False))
-
- document.contents = contents
-
- logger.info(f"File {file.name} (ID: {fileId}) loaded with {len(contents)} contents and summaries")
- documents.append(document)
-
- except Exception as e:
- logger.error(f"Error processing file {fileId}: {str(e)}")
- # Continue with remaining files instead of failing
- continue
-
- return documents
-
- async def prepareAgentInputDocuments(self, docInputList: List[Dict[str, Any]], workflow: ChatWorkflow) -> List[Dict[str, Any]]:
- """
- Prepares input documents for an agent, sorted with newest first.
-
- Args:
- docInputList: List of required input documents as specified by the project manager
- workflow: Workflow object
-
- Returns:
- Prepared input documents for the agent, sorted with newest first
- """
- preparedInputs = []
-
- # Sort workflow messages by sequence number (descending)
- sortedMessages = sorted(
- workflow.messages,
- key=lambda m: m.sequenceNo,
- reverse=True
- )
-
- for docSpec in docInputList:
- docFilename = docSpec.get("label", "")
- docFileId = docSpec.get("fileId", "")
-
- foundDoc = None
- # Search for the document in sorted workflow messages (newest first)
- for message in sortedMessages:
- for doc in message.documents:
- if (docFileId != "" and docFileId == doc.fileId) or (docFilename != "" and self.getFilename(doc) == docFilename):
- foundDoc = doc
- break
- if foundDoc:
- break
- if foundDoc:
- # Process document for agent based on the specification
- processedDoc = await self.processDocumentForAgent(foundDoc, docSpec)
-
- preparedInputs.append(processedDoc)
- else:
- logger.warning(f"Document with label '{docFilename}', fileId '{docFileId}' not found in workflow")
-
- return preparedInputs
-
- async def processDocumentForAgent(self, document: ChatDocument, docSpec: Dict[str, Any]) -> ChatDocument:
- """
- Processes a document for an agent based on the document specification.
- Uses AI to extract relevant content from the document based on the specification.
-
- Args:
- document: The document to process
- docSpec: The document specification from the project manager
-
- Returns:
- Processed document with AI-extracted content
- """
- processedDoc = document.copy()
- partSpec = docSpec.get("contentPart", "")
-
- # Process each content item in the document
- if "contents" in processedDoc:
- processedContents = []
-
- for content in processedDoc["contents"]:
- # Check if part required
- if partSpec != "" and partSpec != content.name:
- continue
-
- # Get the prompt from the document specification
- summary = docSpec.get("prompt", "Extract the relevant information from this document")
-
- # Process content using the shared helper function
- processedContent = content.copy()
- processedContent["dataExtracted"] = await self.getContentExtraction(content, summary)
- processedContent["metadata"]["aiProcessed"] = True
-
- processedContents.append(processedContent)
-
- processedDoc["contents"] = processedContents
-
- return processedDoc
-
- async def getContentExtraction(self, content: Dict[str, Any], prompt: str = None) -> str:
- """
- Helper function that extracts or summarizes content based on its encoding.
- For base64 encoded content, uses callAi4Image. For non-base64 content, uses callAi.
-
- Args:
- content: Content item to analyze
- prompt: Custom prompt for extraction (default prompts used if not provided)
-
- Returns:
- Extracted or summarized content as text
- """
- try:
- # Get content data and encoding status
- data = content.get("data", "")
- isBase64 = content.get("base64Encoded", False)
-
- # Default prompts if none provided
- if prompt is None:
- textPrompt = "Create a very concise summary (1-2 sentences, maximum 200 characters) about this content."
- imagePrompt = "Create a very concise summary (1-2 sentences, maximum 200 characters) about this image."
- else:
- textPrompt = prompt
- imagePrompt = prompt
-
- # Handle base64 encoded content
- if isBase64:
- try:
- # Pass base64 encoded data directly to callAi4Image
- return await self.service.functions.callAi4Image(data, content.mimeType, imagePrompt)
- except Exception as e:
- logger.error(f"Error processing base64 content: {str(e)}")
- return f"Error processing content: {str(e)}"
- else:
- # For non-base64 content, use callAi
- return await self.service.functions.callAi([
- {"role": "system", "content": "You are a content analyzer. Extract relevant information from the provided content."},
- {"role": "user", "content": f"{textPrompt}\n\nContent:\n{data}"}
- ], produceUserAnswer=True)
-
- except Exception as e:
- logger.error(f"Error processing content: {str(e)}")
- return f"Error processing content: {str(e)}"
-
def messageAdd(self, workflow: ChatWorkflow, message: ChatMessage) -> ChatMessage:
"""
Adds a message to the workflow and updates lastActivity.
@@ -1350,7 +1178,7 @@ filesDelivered = {self.parseJson2text(matchingDocuments)}
Returns:
List with information about all available agents
"""
- return self.agentRegistry.getAgentInfos()
+ return self.agentManager.getAgentInfos()
def getFilename(self, document: ChatDocument) -> str:
"""
diff --git a/notes/changelog.txt b/notes/changelog.txt
index cffe2509..1f195f36 100644
--- a/notes/changelog.txt
+++ b/notes/changelog.txt
@@ -1,39 +1,6 @@
....................... TASKS
-WORKFLOW TO ENHANCE WITH self.service container --> let AI define it, then to initialize it for a workflow class
-
-WORKFLOW: To create model environment: The BUILDING BLOCKS
-
-self.service:
-- user
- - attributes (items)
- - connection (list)
-- functions (serviceManagementClass instance)
-- operator:
- - for each (list of references)
- - aiCall
- - extract(file) -> content
- - fileref agent 2 fileid
- - fileid 2 fileref agent
- - convert(data, format)
- - create agent input file list
- - save agent output files
-
-- workflow
- - active task (reference)
- - id
- - progress
- - status
- - tasks (list of tasks)
- - id
- - input data?
- - output data?
- -
-
-
-
-
Walkthroughs:
- register
- login local