gateway/modules/workflowManager.py
2025-04-26 02:13:22 +02:00

1236 lines
No EOL
50 KiB
Python

"""
Workflow Manager Module for state machine-based backend chat workflow.
Implements the state machine as defined in the documentation.
"""
import asyncio
import os
import logging
import json
import re
import uuid
import base64
from datetime import datetime
from typing import Dict, Any, List, Optional, Union, Tuple
# Required imports
from modules.workflowAgentsRegistry import getAgentRegistry
from modules.lucydomInterface import getLucydomInterface as domInterface
from modules.documentProcessor import getDocumentContents
# Configure logger
logger = logging.getLogger(__name__)
# Global settings for the workflow management
GLOBAL_WORKFLOW_LABELS = {
"systemName": "AI Assistant", # Default system name for logs
"workflowStatusMessages": {
"init": "Workflow initialized",
"running": "Running workflow",
"waiting": "Waiting for input",
"completed": "Workflow completed successfully",
"stopped": "Workflow stopped by user",
"failed": "Error in workflow"
}
}
class WorkflowManager:
"""
Manages the processing of chat requests, agent execution, and
the integration of results into the workflow, following a state machine approach.
"""
def __init__(self, mandateId: int, userId: int):
"""
Initializes the WorkflowManager with mandate and user context.
Args:
mandateId: ID of the current mandate
userId: ID of the current user
"""
self.mandateId = mandateId
self.userId = userId
self.mydom = domInterface(mandateId, userId)
self.agentRegistry = getAgentRegistry()
self.agentRegistry.setMydom(self.mydom)
### Workflow State Machine Implementation
async def workflowStart(self, userInput: Dict[str, Any], workflowId: Optional[str] = None) -> Dict[str, Any]:
"""
Main entry point for starting or continuing a workflow (State 1: Workflow Initialization).
Initializes a new workflow or loads an existing one based on workflowId.
Args:
userInput: User input with prompt and optional file list
workflowId: Optional workflow ID to continue an existing workflow
Returns:
Initialized workflow object with status "running"
"""
# 1. Initialize workflow or load existing one
workflow = self.workflowInit(workflowId)
self.logAdd(workflow, "Starting workflow processing", level="info", progress=0)
# Start asynchronous processing
asyncio.create_task(self.workflowProcess(userInput, workflow))
return workflow
async def workflowStop(self, workflowId: str) -> Dict[str, Any]:
"""
Stops a running workflow (State 8: Workflow Stopped).
Sets status to "stopped" and adds a log entry.
Args:
workflowId: ID of the workflow to stop
Returns:
Updated workflow with status="stopped"
"""
workflow = self.mydom.loadWorkflowState(workflowId)
if not workflow:
return {"error": "Workflow not found", "status": "failed"}
# Update status to stopped
workflow["status"] = "stopped"
workflow["lastActivity"] = datetime.now().isoformat()
# Update in database
self.mydom.updateWorkflow(workflowId, {
"status": workflow["status"],
"lastActivity": workflow["lastActivity"]
})
self.logAdd(workflow, GLOBAL_WORKFLOW_LABELS["workflowStatusMessages"]["stopped"], level="info", progress=100)
return workflow
async def workflowProcess(self, userInput: Dict[str, Any], workflow: Dict[str, Any]) -> Dict[str, Any]:
"""
Main processing function that implements the workflow state machine.
Handles the complete workflow process from user input to final response.
Args:
userInput: User input with prompt and optional file list
workflow: Current workflow object
Returns:
Updated workflow with processing results
"""
try:
# State 3: User Message Processing
messageUser = await self.chatMessageToWorkflow("user", "", userInput, workflow)
messageUser["status"] = "first" # For first message
# State 4: Project Manager Analysis
self.logAdd(workflow, "Analyzing request and planning work", level="info", progress=10)
projectManagerResponse = await self.projectManagerAnalysis(messageUser, workflow)
objFinalDocuments = projectManagerResponse.get("objFinalDocuments", [])
objWorkplan = projectManagerResponse.get("objWorkplan", [])
objUserResponse = projectManagerResponse.get("objUserResponse", "")
# Get detected language and set it in the mydom interface
userLanguage = projectManagerResponse.get("userLanguage", "en")
self.mydom.setUserLanguage(userLanguage)
# Save the response as a message in the workflow and add log entries
responseMessage = {
"role": "assistant",
"agentName": "project_manager",
"content": objUserResponse,
"status": "step" # As per state machine specification
}
self.messageAdd(workflow, responseMessage)
self.logAdd(workflow, f"Planned outputs: {len(objFinalDocuments)} documents", level="info", progress=20)
self.logAdd(workflow, f"Work plan created with {len(objWorkplan)} steps", level="info", progress=25)
# State 5: Agent Execution
objResults = []
if objWorkplan:
totalTasks = len(objWorkplan)
for taskIndex, task in enumerate(objWorkplan):
agentName = task.get("agent", "unknown")
progressValue = 30 + int((taskIndex / totalTasks) * 60) # Progress from 30% to 90%
progressMsg = f"Running task {taskIndex+1}/{totalTasks}: {agentName}"
self.logAdd(workflow, progressMsg, level="info", progress=progressValue)
taskResults = await self.agentProcessing(task, workflow)
objResults.extend(taskResults)
# Log completion of this task
self.logAdd(
workflow,
f"Completed task {taskIndex+1}/{totalTasks}: {agentName}",
level="info",
progress=progressValue + (60/totalTasks)/2
)
# State 6: Final Response Generation
self.logAdd(workflow, "Creating final response", level="info", progress=90)
finalMessage = await self.generateFinalMessage(objUserResponse, objFinalDocuments, objResults)
finalMessage["status"] = "last" # As per state machine specification
self.messageAdd(workflow, finalMessage)
# State 7: Workflow Completion
self.workflowFinish(workflow)
self.logAdd(workflow, GLOBAL_WORKFLOW_LABELS["workflowStatusMessages"]["completed"], level="info", progress=100)
return workflow
except Exception as e:
# State 2: Workflow Exception
logger.error(f"Workflow processing error: {str(e)}", exc_info=True)
workflow["status"] = "failed"
workflow["lastActivity"] = datetime.now().isoformat()
# Update in database
self.mydom.updateWorkflow(workflow["id"], {
"status": "failed",
"lastActivity": workflow["lastActivity"]
})
self.logAdd(workflow, f"Workflow failed: {str(e)}", level="error", progress=100)
return workflow
def workflowInit(self, workflowId: Optional[str] = None) -> Dict[str, Any]:
"""
Initializes a workflow or loads an existing one with round counting (State 1: Workflow Initialization).
Args:
workflowId: Optional - ID of the workflow to load
Returns:
Initialized workflow object
"""
currentTime = datetime.now().isoformat()
if workflowId is None or not self.mydom.getWorkflow(workflowId):
# Create new workflow
newWorkflowId = str(uuid.uuid4()) if workflowId is None else workflowId
workflow = {
"id": newWorkflowId,
"mandateId": self.mandateId,
"userId": self.userId,
"name": f"Workflow {newWorkflowId[:8]}",
"startedAt": currentTime,
"messages": [], # Empty list - will be filled with references
"messageIds": [], # Initialize empty messageIds list
"logs": [],
"dataStats": {},
"currentRound": 1,
"status": "running",
"lastActivity": currentTime,
}
# Save to database - only the workflow metadata
workflowDb = {
"id": workflow["id"],
"mandateId": workflow["mandateId"],
"userId": workflow["userId"],
"name": workflow["name"],
"startedAt": workflow["startedAt"],
"status": workflow["status"],
"dataStats": workflow["dataStats"],
"currentRound": workflow["currentRound"],
"lastActivity": workflow["lastActivity"],
"messageIds": workflow["messageIds"] # Include messageIds
}
self.mydom.createWorkflow(workflowDb)
self.logAdd(workflow, GLOBAL_WORKFLOW_LABELS["workflowStatusMessages"]["init"], level="info", progress=0)
return workflow
else:
# State 10: Workflow Resumption - Load existing workflow
workflow = self.mydom.loadWorkflowState(workflowId)
# Ensure messageIds exists
if "messageIds" not in workflow:
# Initialize from existing messages
workflow["messageIds"] = [msg["id"] for msg in workflow.get("messages", [])]
# Update in database
self.mydom.updateWorkflow(workflowId, {"messageIds": workflow["messageIds"]})
# Update status and increment round counter
workflow["status"] = "running"
workflow["lastActivity"] = currentTime
# Increment currentRound if it exists, otherwise set it to 1
if "currentRound" in workflow:
workflow["currentRound"] += 1
else:
workflow["currentRound"] = 1
# Update in database - only the relevant workflow fields
workflowUpdate = {
"status": workflow["status"],
"lastActivity": workflow["lastActivity"],
"currentRound": workflow["currentRound"]
}
self.mydom.updateWorkflow(workflowId, workflowUpdate)
self.logAdd(workflow, GLOBAL_WORKFLOW_LABELS["workflowStatusMessages"]["running"], level="info", progress=0)
return workflow
def workflowFinish(self, workflow: Dict[str, Any]) -> Dict[str, Any]:
"""
Finalizes a workflow and sets the status to 'completed' (State 7: Workflow Completion).
Args:
workflow: Workflow object
Returns:
Updated workflow object
"""
# Prepare workflow update data
workflowUpdate = {
"status": "completed",
"lastActivity": datetime.now().isoformat(),
}
# Update the workflow object in memory
workflow["status"] = workflowUpdate["status"]
workflow["lastActivity"] = workflowUpdate["lastActivity"]
# Save workflow state to database - only relevant fields
self.mydom.updateWorkflow(workflow["id"], workflowUpdate)
self.logAdd(workflow, GLOBAL_WORKFLOW_LABELS["workflowStatusMessages"]["completed"], level="info", progress=100)
return workflow
async def projectManagerAnalysis(self, messageUser: Dict[str, Any], workflow: Dict[str, Any]) -> Dict[str, Any]:
"""
Creates the prompt for the project manager and processes the response (State 4: Project Manager Analysis).
Args:
messageUser: Message object with user request
workflow: Current workflow object
Returns:
Project manager's response with objFinalDocuments, objWorkplan and objUserResponse
"""
# Get available agents with their capabilities
availableAgents = self.agentProfiles()
# Create a workflow summary
workflowSummary = await self.workflowSummarize(workflow, messageUser)
# Create a list of currently available documents from user input or previously generated documents
availableDocuments = self.getAvailableDocuments(workflow, messageUser)
availableDocsStr = json.dumps(availableDocuments, indent=2)
# Create the prompt for the project manager with language detection requirement
prompt = f"""
Based on the user request and the provided documents, please analyze the requirements and create a processing plan.
Also, identify the language of the user's request and include it in your response.
<userrequest>
{messageUser.get('content')}
</userrequest>
# Previous conversation history:
{workflowSummary}
# Available documents (currently in workflow):
{availableDocsStr}
# Available agents and their capabilities:
{self.parseJson2text(availableAgents)}
Please analyze the request and create:
1. A list of required result documents (objFinalDocuments)
2. A plan for executing agents (objWorkplan)
3. A clear response to the user explaining what you're doing (objUserResponse)
4. Identified language of the user's request (userLanguage)
## IMPORTANT RULES FOR THE WORKPLAN:
1. Each input document must either already exist (provided by the user or previously created by an agent) or be created by an agent before it's used.
2. If necessary, convert input documents to a suitable format using agents when the type doesn't match.
3. Do not define document inputs that don't exist or haven't been generated beforehand.
4. Create a logical sequence - earlier agents can create documents that are later used as inputs.
5. If the user has provided documents but hasn't clearly stated what they want, try to act according to the context.
Your answer must be strictly in the JSON_OUTPUT format, with no additions before or after the JSON object.
JSON_OUTPUT = {{
"objFinalDocuments": ["label",...], # document label in the format 'filename.ext'
"objWorkplan": [
{{
"agent": "agent_name", # Name of an available agent
"prompt": "Specific instructions to the agent, that he knows what to do with which documents and which output to provide."
"outputDocuments": [
{{
"label":"document label in the format 'filename.ext'",
"prompt":"AI prompt to describe the content of the file"
}}
],
"inputDocuments": [
{{
"label":"document label in the format 'filename.ext'",
"fileId":id, # if refering to an existing document, provide fileId to select the correct file
"contentPart":"", # provide empty string, if all document contents to consider, otherwise the contentPart of the document to focus on
"prompt":"AI prompt to describe what data to extract from the file."
}}
], # If no input documents are needed, include "inputDocuments" as an empty list
}}
# Multiple agent tasks can be added here and should build logically on each other
],
"objUserResponse": "Information to the user about how his request will be solved, in the language of the user's request.",
"userLanguage": "en" # Language code (e.g., en, de, fr, es) based on the user's request
}}
## RULES for inputDocuments:
1. The user request refers to documents where "fileSource" in available documents is "user". Those documents are in the focus for input
2. In case of redundant label in available documents, use document with highest sequenceNr if not specified differently
## STRICT RULES FOR document "label":
1. Every document label MUST include a proper file extension that matches the content type.
2. Use standard extensions like:
- ".txt" for text files
- ".md" for markdown files
- ".csv" for comma-separated values
- ".json" for JSON data
- ".html" for HTML content
- ".jpg" or ".png" for images
- ".docx" for Word documents
- ".xlsx" for Excel files
- ".pdf" for PDF documents
3. Use descriptive filenames that indicate the document's purpose (e.g., "analysis_report.txt" rather than just "report.txt")
4. If you use label for an existing file
"""
# Call the AI service through mydom for language support
logger.debug(f"PROJECT MANAGER Planning prompt: {prompt}")
projectManagerOutput = await self.mydom.callAi([
{
"role": "system",
"content": "You are an experienced project manager who analyzes user requests and creates work plans. You pay very careful attention to ensure that all document dependencies are correct and that no non-existent documents are defined as inputs. The output follows strictly the specified format."
},
{
"role": "user",
"content": prompt
}
])
# Parse the JSON response
logger.debug(f"PROJECT MANAGER Planning answer: {projectManagerOutput}")
return self.parseJsonResponse(projectManagerOutput)
async def agentProcessing(self, task: Dict[str, Any], workflow: Dict[str, Any]) -> List[Dict[str, Any]]:
"""
Process a single agent task from the workflow (State 5: Agent Execution).
Optimized for the task-based approach where all agents implement processTask.
Args:
task: The task definition containing agent name, prompt, and document specifications
workflow: The current workflow object
Returns:
List of document objects created by the agent
"""
# 1. Extract task information
agentName = task.get("agent")
agentPrompt = task.get("prompt", "")
# Log the current step
outputLabels = []
for doc in task.get("outputDocuments", []):
outputLabels.append(doc.get("label", "unknown"))
stepInfo = f"Agent '{agentName}' to create {', '.join(outputLabels)}."
self.logAdd(workflow, stepInfo, level="info")
# Check if prompt is empty
if agentPrompt == "":
logger.warning("Empty prompt, no task to do")
return []
# Get agent from registry
agent = self.agentRegistry.getAgent(agentName)
if not agent:
logger.error(f"Agent '{agentName}' not found")
return []
# Prepare output document specifications
outputSpecs = []
for doc in task.get("outputDocuments", []):
outputSpec = {
"label": doc.get("label"),
"description": doc.get("prompt", "")
}
outputSpecs.append(outputSpec)
# Prepare input documents for the agent
inputDocuments = await self.prepareAgentInputDocuments(task.get('inputDocuments', []), workflow)
# Create a standardized task object for the agent as per state machine spec
agentTask = {
"taskId": str(uuid.uuid4()),
"workflowId": workflow.get("id"),
"prompt": agentPrompt,
"inputDocuments": inputDocuments,
"outputSpecifications": outputSpecs,
"context": {
"workflowRound": workflow.get("currentRound", 1),
"agentType": agentName,
"timestamp": datetime.now().isoformat(),
"language": self.mydom.userLanguage # Pass language to agent
}
}
# Execute the agent with the standardized task
try:
# Process the task using the agent's standardized interface
logger.debug("TASK: "+self.parseJson2text(agentTask))
logger.debug(f"Agent '{agentName}' AI service available: {agent.mydom is not None}")
agentResults = await agent.processTask(agentTask)
logger.debug(f"Agent '{agentName}' completed task. RESULT: {self.parseJson2text(agentResults)}")
# Log the agent response
self.logAdd(
workflow,
f"Agent '{agentName}' completed task. Feedback: {agentResults.get('feedback', 'No feedback provided')}",
level="info"
)
# Store produced files and prepare input object for message
agentInputs = {
"prompt": agentResults.get("feedback", ""),
"listFileId": self.saveAgentDocuments(agentResults)
}
# Create a message in the workflow with the agent's response
agentMessage = await self.chatMessageToWorkflow("assistant", agentName, agentInputs, workflow)
agentMessage["status"] = "step" # As per state machine specification
logger.debug(f"Agent result = {self.parseJson2text(agentMessage)}.")
return agentMessage.get("documents", [])
except Exception as e:
errorMsg = f"Error executing agent '{agentName}': {str(e)}"
logger.error(errorMsg, exc_info=True) # Add exc_info=True to get full traceback
self.logAdd(workflow, errorMsg, level="error")
return []
async def generateFinalMessage(self, objUserResponse: str, objFinalDocuments: List[str], objResults: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
Creates the final response message with review of promised and delivered documents (State 6: Final Response Generation).
Args:
objUserResponse: Initial text response to the user
objFinalDocuments: List of expected response documents
objResults: List of generated result documents
Returns:
Complete message object with content and relevant documents
"""
# Find documents that match the objFinalDocuments requirements
matchingDocuments = []
if len(objFinalDocuments) > 0:
for answerLabel in objFinalDocuments:
# Find matching document in results
for doc in objResults:
docName = self.getFilename(doc)
# Check if this document matches the answer specification
if docName == answerLabel:
contentRef = []
for c in doc.get("contents", []):
contentRef.append(c.get("summary", ""))
docRef = {
"label": docName,
"contentSummary": contentRef
}
matchingDocuments.append(docRef)
break
# Use the mydom for language-aware AI calls
finalPrompt = await self.mydom.callAi([
{"role": "system", "content": "You are a project manager, who delivers results to a user."},
{"role": "user", "content": f"""
Give the final short feedback to the user with reference to the initial statement (objUserResponse). Inform him about the list of filesDelivered. You do not need to send the files, this is handled separately. If in the list of filesDelivered there might miss some files_promised, just give a comment on this, otherwise task is now completed successful.
Here the data:
objUserResponse = {self.parseJson2text(objUserResponse)}
filesPromised = {self.parseJson2text(objFinalDocuments)}
filesDelivered = {self.parseJson2text(matchingDocuments)}
"""
}
], produceUserAnswer=True)
# Create basic message structure with proper fields
logger.debug(f"FINAL PROMPT = {self.parseJson2text(finalPrompt)}.")
finalMessage = {
"role": "assistant",
"agentName": "project_manager",
"content": finalPrompt,
"documents": [] # DO NOT include the results documents, already with agents
}
logger.debug(f"FINAL MESSAGE = {self.parseJson2text(finalMessage)}.")
return finalMessage
async def workflowSummarize(self, workflow: Dict[str, Any], messageUser: Dict[str, Any]) -> str:
"""
Creates a summary of the workflow without the current user message.
Args:
workflow: Workflow object
messageUser: Current user message
Returns:
Summary of the workflow
"""
if not workflow or "messages" not in workflow or not workflow["messages"]:
return "" # First message
# Go through messages in chronological order
messages = sorted(workflow["messages"], key=lambda m: m.get("sequenceNo", 0), reverse=False)
summaryParts = []
for message in messages:
if message["id"] != messageUser["id"]:
messageSummary = await self.messageSummarize(message)
summaryParts.append(messageSummary)
return "\n\n".join(summaryParts)
async def messageSummarize(self, message: Dict[str, Any]) -> str:
"""
Creates a summary of a message including its documents.
Args:
message: Message to summarize
Returns:
Summary of the message
"""
role = message.get("role", "undefined")
agentName = message.get("agentName", "")
content = message.get("content", "")
try:
# Use the mydom for language-aware AI calls
contentSummary = await self.mydom.callAi([
{"role": "system", "content": f"You are a chat message summarizer. Create a very concise summary (2-3 sentences, maximum 300 characters)"},
{"role": "user", "content": content}
])
except Exception as e:
logger.error(f"Error creating summary: {str(e)}")
contentSummary = content[:200] + "..."
# Summarize documents
docsSummary = ""
if "documents" in message and message["documents"]:
docsList = []
for i, doc in enumerate(message["documents"]):
docName = self.getFilename(doc)
docsList.append(docName)
if docsList:
docsSummary = "\nDocuments:" + "\n- ".join(docsList)
return f"[{role} {agentName}]: {contentSummary}{docsSummary}"
async def chatMessageToWorkflow(self, role: str, agentName: str, chatMessage: Dict[str, Any], workflow: Dict[str, Any]) -> Dict[str, Any]:
"""
Integrates user inputs into a Message object including files with complete contents (State 3: User Message Processing).
Args:
role: Role of the message sender ('user' or 'assistant')
agentName: Name of the agent, if message is from an agent
chatMessage: Input data with "prompt"=str, "listFileId"=[]
workflow: Current workflow object
Returns:
Message object with content and documents including contents
"""
logger.info(f"Message from {role} {agentName} sent with {len(chatMessage.get('listFileId', []))} documents")
logger.debug(f"message = {self.parseJson2text(chatMessage)}.")
# Check message content
messageContent = chatMessage.get("prompt", "")
if isinstance(messageContent, dict) and "content" in messageContent:
messageContent = messageContent["content"]
# If message content is empty, no chat
if role == "user" and (messageContent is None or messageContent.strip() == ""):
logger.warning(f"Empty message, no chat")
messageContent = "(No user input received)"
# Process additional files with complete contents
additionalFileIds = chatMessage.get("listFileId", [])
additionalFiles = await self.processFileIds(additionalFileIds)
# Create message object
messageObject = {
"role": role,
"agentName": agentName,
"content": messageContent,
"documents": additionalFiles
}
messageObject = self.messageAdd(workflow, messageObject)
logger.debug(f"message_user = {self.parseJson2text(messageObject)}.")
return messageObject
async def processFileIds(self, fileIds: List[int]) -> List[Dict[str, Any]]:
"""
Processes a list of File-IDs and returns the corresponding file objects as a list of Document objects.
Loads all contents directly and adds summaries to each content item.
Args:
fileIds: List of file IDs
Returns:
List of Document objects with contents and summaries
"""
documents = []
logger.info(f"Processing {len(fileIds)} files")
for fileId in fileIds:
try:
# Check if the file exists
file = self.mydom.getFile(fileId)
if not file:
logger.warning(f"File with ID {fileId} not found")
continue
# Check if file belongs to the current mandate
if file.get("mandateId") != self.mandateId:
logger.warning(f"File {fileId} does not belong to mandate {self.mandateId}")
continue
# Load file content
fileContent = self.mydom.getFileData(fileId)
if fileContent is None:
logger.warning(f"No content found for file with ID {fileId}")
continue
# Create document
fileNameExt = file.get("name")
document = {
"id": f"doc_{str(uuid.uuid4())}",
"fileId": fileId,
"name": os.path.splitext(fileNameExt)[0] if os.path.splitext(fileNameExt)[0] else "noname",
"ext": os.path.splitext(fileNameExt)[1][1:] if os.path.splitext(fileNameExt)[1] else "bin",
"data": base64.b64encode(fileContent).decode('utf-8'), # Add file data as base64
"contents": []
}
# Extract contents
contents = getDocumentContents(file, fileContent)
# Add summaries to each content item
for content in contents:
content["summary"] = await self.messageSummarizeContent(content)
document["contents"] = contents
logger.info(f"File {file.get('name', 'unnamed')} (ID: {fileId}) loaded with {len(contents)} contents and summaries")
documents.append(document)
except Exception as e:
logger.error(f"Error processing file {fileId}: {str(e)}")
# Continue with remaining files instead of failing
continue
return documents
async def prepareAgentInputDocuments(self, docInputList: List[Dict[str, Any]], workflow: Dict[str, Any]) -> List[Dict[str, Any]]:
"""
Prepares input documents for an agent, sorted with newest first.
Args:
docInputList: List of required input documents as specified by the project manager
workflow: Workflow object
Returns:
Prepared input documents for the agent, sorted with newest first
"""
preparedInputs = []
# Sort workflow messages by sequence number (descending)
sortedMessages = sorted(
workflow.get("messages", []),
key=lambda m: m.get("sequenceNo", 0),
reverse=True
)
for docSpec in docInputList:
docFilename = docSpec.get("label", "")
docFileId = docSpec.get("fileId", "")
foundDoc = None
# Search for the document in sorted workflow messages (newest first)
for message in sortedMessages:
for doc in message.get("documents", []):
if (docFileId != "" and docFileId == doc.get("fileId")) or (docFilename != "" and self.getFilename(doc) == docFilename):
foundDoc = doc
break
if foundDoc:
break
if foundDoc:
# Process document for agent based on the specification
processedDoc = await self.processDocumentForAgent(foundDoc, docSpec)
preparedInputs.append(processedDoc)
else:
logger.warning(f"Document with label '{docFilename}', fileId '{docFileId}' not found in workflow")
return preparedInputs
async def processDocumentForAgent(self, document: Dict[str, Any], docSpec: Dict[str, Any]) -> Dict[str, Any]:
"""
Processes a document for an agent based on the document specification.
Uses AI to extract relevant content from the document based on the specification.
Args:
document: The document to process
docSpec: The document specification from the project manager
Returns:
Processed document with AI-extracted content
"""
processedDoc = document.copy()
partSpec = docSpec.get("contentPart", "")
# Process each content item in the document
if "contents" in processedDoc:
processedContents = []
for content in processedDoc["contents"]:
# Check if part required
if partSpec != "" and partSpec != content.get("name"):
continue
# Get the data from the content
data = content.get("data", "")
processedContent = content.copy()
# Check if content data is base64 encoded
isBase64 = content.get("metadata", {}).get("base64Encoded", False)
try:
# Use the AI service to process the document content according to the prompt from the project manager for the document specification
summary = docSpec.get("prompt", "Extract the relevant information from this document")
aiPrompt = f"""
# Please process the following document content according to this instruction:
<instruction>
{summary}
</instruction>
# Document content:
<data>
{data}
</data>
# Extract and provide only the relevant information as requested.
"""
# Call the AI service through mydom for language support
processedData = await self.mydom.callAi([
{"role": "system", "content": "You are a document processing assistant. Extract only the relevant information as requested."},
{"role": "user", "content": aiPrompt}
])
# DO NOT change the original data field
# processedContent["data"] unchanged
processedContent["dataExtracted"] = processedData
processedContent["metadata"]["aiProcessed"] = True
except Exception as e:
logger.error(f"Error processing document content with AI: {str(e)}")
# Fall back to original content if AI processing fails
processedContent["dataExtracted"] = "(no information)"
processedContents.append(processedContent)
processedDoc["contents"] = processedContents
return processedDoc
async def messageSummarizeContent(self, content: Dict[str, Any]) -> str:
"""
Generates a summary for a content item using AI.
Args:
content: Content item to summarize (already processed by getDocumentContents)
Returns:
Brief summary of the content
"""
# Extract relevant information
data = content.get("data", "")
contentType = content.get("contentType", "text/plain")
isText = content.get("metadata", {}).get("isText", False)
try:
# Use the mydom for language-aware AI calls
summary = await self.mydom.callAi([
{"role": "system", "content": "You are a content summarizer. Create very concise summary (1-2 sentences, maximum 200 characters) about this file."},
{"role": "user", "content": f"Summarize this {contentType} content briefly:\n\n{data}"}
])
return summary
except Exception as e:
logger.error(f"Error generating content summary: {str(e)}")
return f"Text content ({contentType})"
def messageAdd(self, workflow: Dict[str, Any], message: Dict[str, Any]) -> Dict[str, Any]:
"""
Adds a message to the workflow and updates lastActivity.
Saves the message in the database and updates the workflow with references.
Args:
workflow: Workflow object
message: Message to be saved
Returns:
Added message
"""
currentTime = datetime.now().isoformat()
# Ensure messages list exists
if "messages" not in workflow:
workflow["messages"] = []
# Generate new message ID if not present
if "id" not in message:
message["id"] = f"msg_{str(uuid.uuid4())}"
# Add workflow ID and timestamps
message["workflowId"] = workflow["id"]
message["startedAt"] = currentTime
message["finishedAt"] = currentTime
# Set sequence number
message["sequenceNo"] = len(workflow["messages"]) + 1
# Ensure required fields are present
if "role" not in message:
# Set a default role based on agentName
message["role"] = "assistant" if message.get("agentName") else "user"
if "agentName" not in message:
message["agentName"] = ""
# Set status if not present
if "status" not in message:
message["status"] = "completed"
# Add message to workflow
workflow["messages"].append(message)
# Ensure messageIds list exists
if "messageIds" not in workflow:
workflow["messageIds"] = []
# Add message ID to the messageIds list
workflow["messageIds"].append(message["id"])
# Update workflow status
workflow["lastActivity"] = currentTime
# Save to database - first the message itself
self.mydom.createWorkflowMessage(message)
# Then save the workflow with updated references
workflowUpdate = {
"lastActivity": currentTime,
"messageIds": workflow["messageIds"] # Update the messageIds field
}
self.mydom.updateWorkflow(workflow["id"], workflowUpdate)
return message
def logAdd(self, workflow: Dict[str, Any], message: str, level: str = "info",
progress: Optional[int] = None) -> str:
"""
Adds a log entry to the workflow and also logs it in the logger.
Enhanced with standardized formatting and workflow status tracking.
Args:
workflow: Workflow object
message: Log message
level: Log level (info, warning, error)
progress: Optional - Progress value (0-100)
Returns:
ID of the created log entry
"""
# Ensure logs list exists
if "logs" not in workflow:
workflow["logs"] = []
# Generate log ID
logId = f"log_{str(uuid.uuid4())}"
# Get workflow status
workflowStatus = workflow.get("status", "running")
# Set agentName from global settings
agentName = GLOBAL_WORKFLOW_LABELS.get("systemName", "AI Assistant")
# Create log entry
logEntry = {
"id": logId,
"workflowId": workflow["id"],
"message": message,
"type": level,
"timestamp": datetime.now().isoformat(),
"agentName": agentName,
"status": workflowStatus
}
# Add progress if provided
if progress is not None:
logEntry["progress"] = progress
# Add log to workflow
workflow["logs"].append(logEntry)
# Save in database
self.mydom.createWorkflowLog(logEntry)
# Also log in logger
if level == "info":
logger.info(f"Workflow {workflow['id']}: {message}")
elif level == "warning":
logger.warning(f"Workflow {workflow['id']}: {message}")
elif level == "error":
logger.error(f"Workflow {workflow['id']}: {message}")
return logId
def saveAgentDocuments(self, agentResults: Dict[str, Any]) -> List[int]:
"""
Saves all documents from agent results as files and returns a list of file IDs.
Enhanced to handle the standardized document format from agents.
Args:
agentResults: Dictionary containing agent feedback and documents
Returns:
List of file IDs for the saved documents
"""
fileIds = []
# Extract documents from agent results
documents = agentResults.get("documents", [])
for doc in documents:
try:
# Extract label (filename) and content
label = doc.get("label", "unnamed_file.txt")
content = doc.get("content", "")
# Split label into name and extension
name, ext = os.path.splitext(label)
if ext.startswith('.'):
ext = ext[1:] # Remove leading dot
elif not ext:
# If no extension is provided, default to .txt for text content
ext = "txt"
label = f"{label}.{ext}"
# Determine if content is base64 encoded
isBase64 = False
if isinstance(content, dict) and content.get("metadata", {}).get("base64Encoded", False):
isBase64 = True
content = content.get("data", "")
# Convert content to bytes
if isinstance(content, str):
if isBase64:
# Decode base64 to bytes
try:
fileContent = base64.b64decode(content)
except Exception as e:
logger.warning(f"Failed to decode base64 content: {str(e)}")
fileContent = content.encode('utf-8')
else:
# Convert text to bytes
fileContent = content.encode('utf-8')
else:
# Already bytes
fileContent = content
# Save file to database
fileMeta = self.mydom.saveUploadedFile(fileContent, label)
if fileMeta and "id" in fileMeta:
fileId = fileMeta["id"]
fileIds.append(fileId)
logger.info(f"Saved document '{label}' with file ID: {fileId}")
else:
logger.warning(f"Failed to save document '{label}'")
except Exception as e:
logger.error(f"Error saving document from agent results: {str(e)}")
# Continue with other documents instead of failing
continue
return fileIds
def getAvailableDocuments(self, workflow: Dict[str, Any], messageUser: Dict[str, Any]) -> List[Dict[str, Any]]:
"""
Determines all currently available documents from user input and already generated documents.
Args:
messageUser: Current message from the user
workflow: Current workflow object
Returns:
List with information about all available documents, sorted by message sequenceNr in descending order
"""
availableDocs = []
if "messages" in workflow and workflow["messages"]:
for message in workflow["messages"]:
messageId = message.get("id", "unknown")
sequenceNr = message.get("sequenceNo", 0)
# Determine source
source = "user" if messageId == messageUser.get("id") else "workflow"
# Process documents in this message
if "documents" in message and message["documents"]:
for doc in message["documents"]:
# Get filename using our helper method
filename = self.getFilename(doc)
fileId = doc.get("fileId")
# Extract summaries from all contents
contentSummaries = []
for content in doc.get("contents", []):
contentSummaries.append({
"contentPart": content.get("name", "noname"),
"metadata": content.get("metadata", ""),
"summary": content.get("summary", "No summary"),
})
# Create document info
docInfo = {
"sequenceNr": sequenceNr,
"fileSource": source,
"fileId": fileId,
"messageId": messageId,
"label": filename,
"contentSummaryList": contentSummaries,
}
availableDocs.append(docInfo)
# Sort by message sequenceNr in descending order (newest first)
availableDocs.sort(key=lambda x: x["sequenceNr"], reverse=True)
logger.info(f"Available documents: {len(availableDocs)}")
return availableDocs
def agentProfiles(self) -> List[Dict[str, Any]]:
"""
Gets information about all available agents.
Returns:
List with information about all available agents
"""
return self.agentRegistry.getAgentInfos()
def getFilename(self, document: Dict[str, Any]) -> str:
"""
Gets the filename from a document by combining name and extension.
Args:
document: Document object
Returns:
Filename with extension
"""
name = document.get("name", "unnamed")
ext = document.get("ext", "")
if ext:
return f"{name}.{ext}"
return name
def parseJson2text(self, jsonObj: Any) -> str:
"""
Converts a JSON object to a readable text representation.
Args:
jsonObj: JSON object to convert
Returns:
Formatted text representation
"""
if not jsonObj:
return "No data available"
try:
# Format with indentation for better readability
return json.dumps(jsonObj, indent=2, ensure_ascii=False)
except Exception as e:
logger.error(f"Error in JSON conversion: {str(e)}")
return str(jsonObj)
def parseJsonResponse(self, responseText: str) -> Dict[str, Any]:
"""
Parses the JSON response from a text.
Args:
responseText: Text with JSON content
Returns:
Parsed JSON data
"""
try:
# Extract JSON from the text (if mixed with other content)
jsonStart = responseText.find('{')
jsonEnd = responseText.rfind('}') + 1
if jsonStart >= 0 and jsonEnd > jsonStart:
jsonStr = responseText[jsonStart:jsonEnd]
return json.loads(jsonStr)
else:
# Try to parse the entire text
return json.loads(responseText)
except json.JSONDecodeError as e:
logger.error(f"JSON parsing error: {str(e)}")
# Fallback: Return empty structure
return {
"objFinalDocuments": [],
"objWorkplan": [],
"objUserResponse": "Sorry, I could not parse your data.",
"userLanguage": "en"
}
# Singleton factory for the WorkflowManager
_workflowManagers = {}
def getWorkflowManager(mandateId: int = 0, userId: int = 0) -> WorkflowManager:
"""
Returns a WorkflowManager for the specified context.
Reuses existing instances.
Args:
mandateId: ID of the mandate
userId: ID of the user
Returns:
WorkflowManager instance
"""
contextKey = f"{mandateId}_{userId}"
if contextKey not in _workflowManagers:
_workflowManagers[contextKey] = WorkflowManager(mandateId, userId)
return _workflowManagers[contextKey]