"""
Workflow Manager Module for state machine-based backend chat workflow.
Implements the state machine as defined in the documentation.
"""
import asyncio
import os
import logging
import json
import uuid
import base64
from datetime import datetime, UTC, timedelta
from typing import Dict, Any, List, Optional, Union, Tuple, Callable, TypedDict, Protocol
import time
from modules.shared.mimeUtils import isTextMimeType
# Required imports
from modules.workflow.agentManager import getAgentManager
from modules.workflow.taskManager import getTaskManager
from modules.workflow.documentManager import getDocumentManager
from modules.interfaces.serviceChatModel import (
UserInputRequest, ChatWorkflow, ChatMessage, ChatLog,
ChatDocument, ChatStat, Workflow, Task, AgentResponse
)
# Configure logger
logger = logging.getLogger(__name__)
# Global settings for the workflow management
GLOBAL_WORKFLOW_LABELS = {
"systemName": "AI Assistant", # Default system name for logs
"workflowStatusMessages": {
"init": "Workflow initialized",
"running": "Running workflow",
"waiting": "Waiting for input",
"completed": "Workflow completed successfully",
"stopped": "Workflow stopped by user",
"failed": "Error in workflow"
}
}
class WorkflowStoppedException(Exception):
"""Exception raised when a workflow is forcibly stopped with function checkExitCriteria() """
pass
class ServiceObject:
"""Service object structure available to agents."""
def __init__(self):
self.user: Dict[str, Any] = {} # User context
self.operator: Dict[str, Callable] = {} # Document operations
self.workflow: Dict[str, Any] = {} # Workflow context
self.functions: Any = None # Core functions
self.logAdd: Callable = None # Logging function
class WorkflowManager:
"""Manages the execution of workflows and their associated agents."""
def __init__(self, service: ServiceObject):
"""Initialize the workflow manager with service container."""
# Store service container
self.service = service
self.service.logAdd = self.logAdd
# Initialize managers
self.agentManager = getAgentManager()
self.taskManager = getTaskManager()
self.documentManager = getDocumentManager()
# Initialize managers with service
self.agentManager.initialize(service=self.service)
self.documentManager.initialize(service=self.service)
# Add agent service functionality directly to service object
service.user = {
'attributes': service.user.get('attributes', {}),
'connection': service.user.get('connection', [])
}
# Add operator functions
service.operator = {
'forEach': lambda items, func: [func(item) for item in items],
'aiCall': service.functions.callAi,
'extract': lambda file: self.documentManager.extractContent(file),
'fileRefToFileId': lambda ref: self.documentManager.convertFileRefToId(ref),
'fileIdToFileRef': lambda fileId: self.documentManager.convertFileIdToRef(fileId),
'convert': lambda data, format: self.documentManager.convertDataFormat(data, format),
'createAgentInputFiles': lambda files: self.documentManager.createAgentInputFileList(files),
'saveAgentOutputFiles': lambda files: self.documentManager.saveAgentOutputFiles(files)
}
# Add workflow context
service.workflow = {
'activeTask': {
'id': None,
'progress': 0,
'status': 'pending'
},
'tasks': []
}
def _extractFileContent(self, file):
"""Extract content from a file for agent processing."""
try:
fileData = self.service.functions.getFileData(file['id'])
if fileData is None:
return None
# Handle base64 encoded content
if file.get('base64Encoded', False):
import base64
return base64.b64decode(fileData)
# Handle text content
if isinstance(fileData, bytes):
return fileData.decode('utf-8')
return fileData
except Exception as e:
logger.error(f"Error extracting file content: {str(e)}")
return None
def _convertFileRefToId(self, ref):
"""Convert agent file reference to file ID."""
try:
# Extract file ID from reference format
if isinstance(ref, str) and ';' in ref:
return int(ref.split(';')[1])
return int(ref)
except Exception as e:
logger.error(f"Error converting file reference to ID: {str(e)}")
return None
def _convertFileIdToRef(self, fileId):
"""Convert file ID to agent file reference."""
try:
file = self.service.functions.getFile(fileId)
if not file:
return None
return f"{file['name']};{fileId}"
except Exception as e:
logger.error(f"Error converting file ID to reference: {str(e)}")
return None
def _convertDataFormat(self, data, format):
"""Convert data between different formats."""
try:
if format == 'json':
if isinstance(data, str):
return json.loads(data)
return json.dumps(data)
elif format == 'base64':
import base64
if isinstance(data, str):
return base64.b64encode(data.encode('utf-8')).decode('utf-8')
return base64.b64encode(data).decode('utf-8')
return data
except Exception as e:
logger.error(f"Error converting data format: {str(e)}")
return data
def _createAgentInputFileList(self, files):
"""Create a list of input files for agent processing."""
try:
inputFiles = []
for file in files:
fileId = self._convertFileRefToId(file)
if fileId:
fileData = self.service.functions.getFile(fileId)
if fileData:
inputFiles.append({
'id': fileId,
'name': fileData['name'],
'mimeType': fileData['mimeType'],
'content': self._extractFileContent(fileData)
})
return inputFiles
except Exception as e:
logger.error(f"Error creating agent input file list: {str(e)}")
return []
def _saveAgentOutputFiles(self, files):
"""Save output files from agent processing."""
try:
savedFiles = []
for file in files:
# Create file metadata
fileMeta = self.service.functions.createFile(
name=file['name'],
mimeType=file.get('mimeType', 'application/octet-stream'),
size=len(file['content'])
)
if fileMeta and 'id' in fileMeta:
# Save file content
if self.service.functions.createFileData(fileMeta['id'], file['content']):
savedFiles.append({
'id': fileMeta['id'],
'name': file['name'],
'mimeType': file.get('mimeType', 'application/octet-stream')
})
return savedFiles
except Exception as e:
logger.error(f"Error saving agent output files: {str(e)}")
return []
async def workflowStart(self, userInput: UserInputRequest, workflowId: Optional[str] = None) -> ChatWorkflow:
"""Starts a new workflow or continues an existing one."""
# 1. Initialize workflow or load existing one
workflow = self.workflowInit(workflowId)
self.logAdd(workflow, "Starting workflow processing", level="info", progress=0)
# Start asynchronous processing
asyncio.create_task(self.workflowProcess(userInput, workflow))
return workflow
def checkExitCriteria(self, workflow: ChatWorkflow) -> None:
"""
Check if the workflow should exit based on the current state.
Raises WorkflowStoppedException if workflow should stop.
Args:
workflow: ChatWorkflow object to check
"""
current_workflow = self.service.functions.loadWorkflowState(workflow.id)
if current_workflow["status"] in ["stopped", "failed"]:
self.logAdd(workflow, f"Workflow processing terminated due to status: {current_workflow['status']}", level="info")
# Raise an exception to stop execution
raise WorkflowStoppedException(f"Workflow execution stopped due to status: {current_workflow['status']}")
async def workflowProcess(self, userInput: UserInputRequest, workflow: ChatWorkflow) -> ChatWorkflow:
"""
Main processing function that implements the workflow state machine.
Handles the complete workflow process from user input to final response.
Args:
userInput: User input with prompt and optional file list
workflow: Current ChatWorkflow object
Returns:
Updated ChatWorkflow object with processing results
"""
startTime = time.time()
try:
# State 3: User Message Processing
self.checkExitCriteria(workflow)
messageUser = await self.chatMessageToWorkflow("user", None, {
"prompt": userInput.prompt,
"listFileId": userInput.listFileId
}, workflow)
messageUser.status = "first" # For first message
# State 4: Project Manager Analysis
self.checkExitCriteria(workflow)
self.logAdd(workflow, "Analyzing request and planning work", level="info", progress=10)
projectManagerResponse = await self.projectManagerAnalysis(messageUser, workflow)
objFinalDocuments = projectManagerResponse.get("objFinalDocuments", [])
objWorkplan = projectManagerResponse.get("objWorkplan", [])
objUserResponse = projectManagerResponse.get("objUserResponse", "")
# Get detected language and set it in the serviceBase interface
self.checkExitCriteria(workflow)
userLanguage = projectManagerResponse.get("userLanguage", "en")
workflow.userLanguage = userLanguage
self.service.functions.setUserLanguage(userLanguage)
# Save the response as a message in the workflow and add log entries
self.checkExitCriteria(workflow)
responseMessage = ChatMessage(
id=str(uuid.uuid4()),
workflowId=workflow.id,
agentName="Project Manager",
message=objUserResponse,
role="assistant",
status="step",
sequenceNr=len(workflow.messages) + 1,
startedAt=datetime.now(UTC).isoformat(),
finishedAt=datetime.now(UTC).isoformat(),
success=True
)
workflow.messages.append(responseMessage)
# Add detailed log entry about the task plan
taskPlanLog = "Input: "
if objFinalDocuments:
taskPlanLog += ", ".join(objFinalDocuments) + "
"
else:
taskPlanLog += "No input files
"
# Work Plan Steps
for i, task in enumerate(objWorkplan, 1):
agentName = task.get("agent", "unknown")
taskPlanLog += f"{i}. Agent {agentName}
"
# Input Documents
inputDocs = task.get("inputDocuments", [])
if inputDocs:
inputLabels = [doc.get("label", "unknown") for doc in inputDocs]
taskPlanLog += f"- Input: {', '.join(inputLabels)}
"
# Task Prompt
prompt = task.get('prompt', 'No prompt')
taskPlanLog += f"- Task: {prompt}
"
# Output Documents
outputDocs = task.get("outputDocuments", [])
if outputDocs:
outputLabels = [doc.get("label", "unknown") for doc in outputDocs]
taskPlanLog += f"- Output: {', '.join(outputLabels)}
"
# Final Results
taskPlanLog += "Result: "
if objFinalDocuments:
taskPlanLog += ", ".join(objFinalDocuments)
else:
taskPlanLog += "No result files"
self.logAdd(workflow, taskPlanLog, level="info", progress=25)
# State 5: Agent Execution
objResults = []
if objWorkplan:
totalTasks = len(objWorkplan)
for taskIndex, task in enumerate(objWorkplan):
self.checkExitCriteria(workflow)
agentName = task.get("agent", "unknown")
progressValue = 30 + int((taskIndex / totalTasks) * 60) # Progress from 30% to 90%
progressMsg = f"Running task {taskIndex+1}/{totalTasks}: {agentName}"
self.logAdd(workflow, progressMsg, level="info", progress=progressValue)
taskResults = await self.agentProcessing(task, workflow)
objResults.extend(taskResults)
# Log completion of this task
self.logAdd(
workflow,
f"Completed task {taskIndex+1}/{totalTasks}: {agentName}",
level="info",
progress=progressValue + (60/totalTasks)/2
)
# State 6: Final Response Generation
self.checkExitCriteria(workflow)
self.logAdd(workflow, "Creating final response", level="info", progress=90)
finalMessage = await self.generateFinalMessage(objUserResponse, objFinalDocuments, objResults)
finalMessage.status = "last" # As per state machine specification
workflow.messages.append(finalMessage)
# State 7: Workflow Completion
self.checkExitCriteria(workflow)
self.workflowFinish(workflow)
# Update processing time
endTime = time.time()
workflow.stats.processingTime = endTime - startTime
# Update workflow in database
self.service.functions.updateWorkflow(workflow.id, {
"status": workflow.status,
"lastActivity": workflow.lastActivity,
"stats": workflow.stats.model_dump(),
"messages": [msg.model_dump() for msg in workflow.messages]
})
return workflow
except Exception as e:
# State 2: Workflow Exception
logger.error(f"Workflow processing error: {str(e)}", exc_info=True)
workflow.status = "failed"
workflow.lastActivity = datetime.now(UTC).isoformat()
# Update processing time even on error
endTime = time.time()
workflow.stats.processingTime = endTime - startTime
# Update in database
self.service.functions.updateWorkflow(workflow.id, {
"status": "failed",
"lastActivity": workflow.lastActivity,
"stats": workflow.stats.model_dump()
})
self.logAdd(workflow, f"Workflow failed: {str(e)}", level="error", progress=100)
return workflow
def workflowInit(self, workflowId: Optional[str] = None) -> ChatWorkflow:
"""
Initializes a workflow or loads an existing one with round counting (State 1: Workflow Initialization).
Args:
workflowId: Optional - ID of the workflow to load
Returns:
Initialized ChatWorkflow object
"""
currentTime = datetime.now().isoformat()
workflowExist = self.service.functions.getWorkflow(workflowId)
if workflowId is None or not workflowExist:
# Create new workflow
newWorkflowId = str(uuid.uuid4()) if workflowId is None else workflowId
workflow = ChatWorkflow(
id=newWorkflowId,
mandateId=self.functions.mandateId,
status="running",
name=f"Workflow {newWorkflowId[:8]}",
startedAt=currentTime,
messages=[], # Empty list - will be filled with references
logs=[],
stats=ChatStat(
bytesSent=0,
bytesReceived=0,
tokensUsed=0,
processingTime=0.0
),
currentRound=1,
lastActivity=currentTime,
)
# Save to database - only the workflow metadata
workflowDb = workflow.model_dump()
self.service.functions.createWorkflow(workflowDb)
self.logAdd(workflow, GLOBAL_WORKFLOW_LABELS["workflowStatusMessages"]["init"], level="info", progress=0)
logger.debug(f"CHECK DATA {workflow}")
return workflow
else:
# State 10: Workflow Resumption - Load existing workflow
workflow = self.service.functions.loadWorkflowState(workflowId)
workflow = ChatWorkflow(**workflow)
# Update status and increment round counter
workflow.status = "running"
workflow.lastActivity = currentTime
# Increment currentRound if it exists, otherwise set it to 1
workflow.currentRound = (workflow.currentRound or 0) + 1
# Ensure stats exists with correct field names
if not workflow.stats:
workflow.stats = ChatStat(
bytesSent=0,
bytesReceived=0,
tokensUsed=0,
processingTime=0.0
)
elif "tokenCount" in workflow.stats:
# Convert old tokenCount to tokensUsed if needed
workflow.stats.tokensUsed = workflow.stats.pop("tokenCount", 0)
# Update in database - only the relevant workflow fields
workflowUpdate = {
"status": workflow.status,
"lastActivity": workflow.lastActivity,
"currentRound": workflow.currentRound,
"stats": workflow.stats.model_dump() # Include updated stats
}
self.service.functions.updateWorkflow(workflowId, workflowUpdate)
self.logAdd(workflow, GLOBAL_WORKFLOW_LABELS["workflowStatusMessages"]["running"], level="info", progress=0)
return workflow
def workflowFinish(self, workflow: ChatWorkflow) -> ChatWorkflow:
"""
Finalizes a workflow and sets the status to 'completed' (State 7: Workflow Completion).
Args:
workflow: ChatWorkflow object
Returns:
Updated ChatWorkflow object
"""
# Prepare workflow update data
workflowUpdate = {
"status": "completed",
"lastActivity": datetime.now().isoformat(),
}
# Update the workflow object in memory
workflow.status = workflowUpdate["status"]
workflow.lastActivity = workflowUpdate["lastActivity"]
# Save workflow state to database - only relevant fields
self.service.functions.updateWorkflow(workflow.id, workflowUpdate)
self.logAdd(workflow, GLOBAL_WORKFLOW_LABELS["workflowStatusMessages"]["completed"], level="info", progress=100)
return workflow
async def projectManagerAnalysis(self, messageUser: ChatMessage, workflow: ChatWorkflow) -> Dict[str, Any]:
"""
Creates the prompt for the project manager and processes the response (State 4: Project Manager Analysis).
Args:
messageUser: Message object with user request
workflow: Current workflow object
Returns:
Project manager's response with objFinalDocuments, objWorkplan and objUserResponse
"""
# Get available agents with their capabilities
availableAgents = self.agentProfiles()
# Create a workflow summary
workflowSummary = await self.workflowSummarize(workflow, messageUser)
# Create a list of currently available documents from user input or previously generated documents
availableDocuments = self.getAvailableDocuments(workflow, messageUser)
availableDocsStr = json.dumps(availableDocuments, indent=2)
# Create the prompt for the project manager with language detection requirement
prompt = f"""
Based on the user request and the provided documents, please analyze the requirements and create a processing plan.
Also, identify the language of the user's request and include it in your response.
{messageUser.content}
# Previous conversation history:
{workflowSummary}
# Available documents (currently in workflow):
{availableDocsStr}
# Available agents and their capabilities:
{self.parseJson2text(availableAgents)}
Please analyze the request and create:
1. A list of required result documents (objFinalDocuments)
2. A plan for executing agents (objWorkplan)
3. A clear response to the user explaining what you're doing (objUserResponse)
4. Identified language of the user's request (userLanguage)
## IMPORTANT RULES FOR THE WORKPLAN:
1. Each input document must either already exist (provided by the user or previously created by an agent) or be created by an agent before it's used.
2. Document data is already extracted for the agent based on your prompt to the agent. He does not need to do this again.
3. Do not define document inputs that don't exist or haven't been generated beforehand.
4. Create a logical sequence - earlier agents can create documents that are later used as inputs.
5. If the user has provided documents but hasn't clearly stated what they want, try to act according to the context.
6. ALL documents provided by the user (where fileSource is "user") MUST be included in the work plan, even if they don't have content summaries or if content extraction failed.
## AGENT SELECTION GUIDELINES:
1. Carefully analyze the task requirements and match them with agent capabilities
2. Consider the type of operation needed (data processing, analysis, documentation, etc.)
3. Review each agent's capabilities and select the most appropriate one for the task
4. Ensure the selected agent has the necessary capabilities to handle the input and output formats
5. If multiple agents could handle the task, choose the one with the most specific capabilities for the task
Your answer must be strictly in the JSON_OUTPUT format, with no additions before or after the JSON object.
JSON_OUTPUT = {{
"objFinalDocuments": ["label",...], # document label in the format 'filename.ext'
"objWorkplan": [
{{
"agent": "agentName", # Name of an available agent
"prompt": "Specific instructions to the agent, that he knows what to do with which documents and which output to provide."
"outputDocuments": [
{{
"label":"document label in the format 'filename.ext'",
"prompt":"AI prompt to describe the content of the file"
}}
],
"inputDocuments": [
{{
"label":"document label in the format 'filename.ext'",
"fileId":id, # if refering to an existing document, provide fileId to select the correct file
"contentPart":"", # provide empty string, if all document contents to consider, otherwise the contentPart of the document to focus on
"prompt":"AI prompt to describe what data to extract from the file."
}}
], # If no input documents are needed, include "inputDocuments" as an empty list
}}
# Multiple agent tasks can be added here and should build logically on each other
],
"objUserResponse": "Information to the user about how his request will be solved, in the language of the user's request.",
"userLanguage": "en" # Language code (e.g., en, de, fr, es) based on the user's request
}}
## RULES for inputDocuments:
1. The user request refers to documents where "fileSource" in available documents is "user". Those documents are in the focus for input
2. In case of redundant label in available documents, use document with highest sequenceNr if not specified differently
3. ALL documents provided by the user MUST be included in the work plan, even if they don't have content summaries or if content extraction failed
## STRICT RULES FOR document "label":
1. Every document label MUST include a proper file extension that matches the content type.
2. Use standard extensions like:
- ".txt" for text files
- ".md" for markdown files
- ".csv" for comma-separated values
- ".json" for JSON data
- ".html" for HTML content
- ".jpg" or ".png" for images
- ".docx" for Word documents
- ".xlsx" for Excel files
- ".pdf" for PDF documents
3. Use descriptive filenames that indicate the document's purpose (e.g., "analysis_report.txt" rather than just "report.txt")
4. If you use label for an existing file
"""
# Call the AI service through serviceBase for language support
logger.debug(f"PROJECT MANAGER Planning prompt: {prompt}")
projectManagerOutput = await self.service.functions.callAi([
{
"role": "system",
"content": "You are an experienced project manager who analyzes user requests and creates work plans. You pay very careful attention to ensure that all document dependencies are correct and that no non-existent documents are defined as inputs. The output follows strictly the specified format."
},
{
"role": "user",
"content": prompt
}
])
# Parse the JSON response
logger.debug(f"PROJECT MANAGER Planning answer: {projectManagerOutput}")
return self.parseJsonResponse(projectManagerOutput)
async def agentProcessing(self, task: Dict[str, Any], workflow: ChatWorkflow) -> List[Dict[str, Any]]:
"""
Process a single agent task from the workflow (State 5: Agent Execution).
Uses the new Task and AgentResponse models.
Args:
task: The task definition containing agent name, prompt, and document specifications
workflow: The current workflow object
Returns:
List of document objects created by the agent
"""
try:
# Create Task object
task_obj = Task(
id=str(uuid.uuid4()),
workflowId=workflow.id,
agentName=task.get("agent"),
status="pending",
progress=0.0,
prompt=task.get("prompt", ""),
filesInput=task.get("inputDocuments", []),
filesOutput=task.get("outputDocuments", []),
userLanguage=workflow.userLanguage
)
# Execute agent
response, updated_task = await self.agentManager.executeAgent(task_obj)
# Update workflow stats
if response.performance:
workflow.stats.tokensUsed += response.performance.get("tokensUsed", 0)
workflow.stats.bytesSent += response.performance.get("bytesSent", 0)
workflow.stats.bytesReceived += response.performance.get("bytesReceived", 0)
# Update in database
self.service.functions.updateWorkflow(workflow.id, {
"stats": workflow.stats.model_dump()
})
# Log the agent response
self.logAdd(
workflow,
f"Agent {task.get('agent')} completed task. Feedback: {response.message.message if response.message else 'No feedback provided'}",
level="info"
)
# Create a message in the workflow with the agent's response
agentMessage = await self.chatMessageToWorkflow("assistant", task.get("agent"), {
"prompt": response.message.message if response.message else "",
"listFileId": response.message.documents if response.message else []
}, workflow)
agentMessage.status = "step" # As per state machine specification
return agentMessage.documents
except Exception as e:
errorMsg = f"Error executing agent '{task.get('agent')}': {str(e)}"
logger.error(errorMsg, exc_info=True)
self.logAdd(workflow, errorMsg, level="error")
return []
async def generateFinalMessage(self, objUserResponse: str, objFinalDocuments: List[str], objResults: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
Creates the final response message with review of promised and delivered documents (State 6: Final Response Generation).
Args:
objUserResponse: Initial text response to the user
objFinalDocuments: List of expected response documents
objResults: List of generated result documents
Returns:
Complete message object with content and relevant documents
"""
# Find documents that match the objFinalDocuments requirements
matchingDocuments = []
if len(objFinalDocuments) > 0:
for answerLabel in objFinalDocuments:
# Find matching document in results
for doc in objResults:
docName = self.getFilename(doc)
# Check if this document matches the answer specification
if docName == answerLabel:
contentRef = []
for c in doc.get("contents", []):
contentRef.append(c.get("summary", ""))
docRef = {
"label": docName,
"contentSummary": contentRef
}
matchingDocuments.append(docRef)
break
# Use the serviceBase for language-aware AI calls
finalPrompt = await self.service.functions.callAi([
{"role": "system", "content": "You are a project manager, who delivers results to a user."},
{"role": "user", "content": f"""
Give the final short feedback to the user with reference to the initial statement (objUserResponse). Inform him about the list of filesDelivered. You do not need to send the files, this is handled separately. If in the list of filesDelivered some files_promised would be missing, just give a comment on this, otherwise task is now completed successfully.
Here the data:
objUserResponse = {self.parseJson2text(objUserResponse)}
filesPromised = {self.parseJson2text(objFinalDocuments)}
filesDelivered = {self.parseJson2text(matchingDocuments)}
"""
}
], produceUserAnswer=True)
# Create basic message structure with proper fields
logger.debug(f"FINAL PROMPT = {self.parseJson2text(finalPrompt)}.")
finalMessage = {
"role": "assistant",
"agentName": "Project Manager",
"content": finalPrompt,
"documents": [] # DO NOT include the results documents, already with agents
}
logger.debug(f"FINAL MESSAGE = {self.parseJson2text(finalMessage)}.")
return finalMessage
async def workflowSummarize(self, workflow: ChatWorkflow, messageUser: ChatMessage) -> str:
"""
Creates a summary of the workflow without the current user message.
Args:
workflow: Workflow object
messageUser: Current user message
Returns:
Summary of the workflow
"""
if not workflow or "messages" not in workflow or not workflow["messages"]:
return "" # First message
# Go through messages in chronological order
messages = sorted(workflow["messages"], key=lambda m: m.get("sequenceNo", 0), reverse=False)
summaryParts = []
for message in messages:
if message["id"] != messageUser["id"]:
messageSummary = await self.messageSummarize(message)
summaryParts.append(messageSummary)
return "\n\n".join(summaryParts)
async def messageSummarize(self, message: ChatMessage) -> str:
"""
Creates a summary of a message including its documents.
Args:
message: Message to summarize
Returns:
Summary of the message
"""
role = message.role
agentName = message.agentName
content = message.content
try:
# Use the serviceBase for language-aware AI calls
contentSummary = await self.service.functions.callAi([
{"role": "system", "content": f"You are a chat message summarizer. Create a very concise summary (2-3 sentences, maximum 300 characters)"},
{"role": "user", "content": content}
])
except Exception as e:
logger.error(f"Error creating summary: {str(e)}")
contentSummary = content[:200] + "..."
# Summarize documents
docsSummary = ""
if "documents" in message and message["documents"]:
docsList = []
for i, doc in enumerate(message["documents"]):
docName = self.getFilename(doc)
docsList.append(docName)
if docsList:
docsSummary = "\nDocuments:" + "\n- ".join(docsList)
return f"[{role} {agentName}]: {contentSummary}{docsSummary}"
async def chatMessageToWorkflow(self, role: str, agent: Union[str, Dict[str, Any]], chatMessage: Dict[str, Any], workflow: ChatWorkflow) -> ChatMessage:
"""
Integrates user inputs into a Message object including files with complete contents.
Uses DocumentManager for file processing.
Args:
role: Role of the message sender ('user' or 'assistant')
agent: Agent name or object
chatMessage: Input data with "prompt"=str, "listFileId"=[]
workflow: Current workflow object
Returns:
Message object with content and documents including contents
"""
agentName = agent if isinstance(agent, str) else agent.name if agent else ""
agentLabel = agent.label if hasattr(agent, 'label') else agentName
logger.info(f"Message from {role} {agentName} sent with {len(chatMessage.get('listFileId', []))} documents")
# Check message content
messageContent = chatMessage.get("prompt", "")
if isinstance(messageContent, dict) and "content" in messageContent:
messageContent = messageContent["content"]
# If message content is empty, no chat
if role == "user" and (messageContent is None or messageContent.strip() == ""):
logger.warning(f"Empty message, no chat")
messageContent = "(No user input received)"
# Process additional files with complete contents
additionalFileIds = chatMessage.get("listFileId", [])
additionalFiles = await self.documentManager.processFileIds(additionalFileIds)
# Create message object
messageObject = ChatMessage(
id=str(uuid.uuid4()),
workflowId=workflow.id,
agentName=agentLabel,
message=messageContent,
role=role,
status=chatMessage.get("status", "step"),
sequenceNr=len(workflow.messages) + 1,
startedAt=datetime.now(UTC).isoformat(),
finishedAt=datetime.now(UTC).isoformat(),
success=True,
documents=additionalFiles
)
# Add message to workflow
workflow.messages.append(messageObject)
# Update workflow in database
self.service.functions.updateWorkflow(workflow.id, {
"messages": [msg.model_dump() for msg in workflow.messages]
})
return messageObject
def messageAdd(self, workflow: ChatWorkflow, message: ChatMessage) -> ChatMessage:
"""
Adds a message to the workflow and updates lastActivity.
Saves the message in the database and updates the workflow with references.
Also updates statistics for the message.
Args:
workflow: ChatWorkflow object
message: Message data to be saved
Returns:
Added ChatMessage object
"""
currentTime = datetime.now().isoformat()
# Generate new message ID if not present
if message.id is None:
message.id = f"msg_{str(uuid.uuid4())}"
# Add workflow ID and timestamps
message.workflowId = workflow.id
message.startedAt = currentTime
message.finishedAt = currentTime
# Set sequence number
message.sequenceNo = len(workflow.messages) + 1
# Ensure required fields are present
if message.role is None:
# Set a default role based on agentName
message.role = "assistant" if message.agentName else "user"
if message.agentName is None:
message.agentName = ""
# Set status if not present
if message.status is None:
message.status = "step"
# Calculate statistics for the message
bytesSent = len(message.content.encode('utf-8'))
for doc in message.documents:
if doc.data:
bytesSent += len(doc.data.encode('utf-8'))
for content in doc.contents:
if content.data:
bytesSent += len(content.data.encode('utf-8'))
# Calculate tokens used (now using bytes)
tokensUsed = bytesSent
# Update workflow statistics
if not workflow.stats:
workflow.stats = ChatStat(
bytesSent=0,
bytesReceived=0,
tokensUsed=0,
processingTime=0
)
# Update statistics based on message role
if message.role == "user":
workflow.stats.bytesSent += bytesSent
workflow.stats.tokensUsed += tokensUsed
else: # assistant messages
workflow.stats.bytesReceived += bytesSent
workflow.stats.tokensUsed += tokensUsed
# Create ChatMessage object
chatMessage = ChatMessage(**message.model_dump())
# Add message to workflow
workflow.messages.append(chatMessage)
# Ensure messageIds list exists
if not workflow.messageIds:
workflow.messageIds = []
# Add message ID to the messageIds list
workflow.messageIds.append(chatMessage.id)
# Update workflow status
workflow.lastActivity = currentTime
# Save to database - first the message itself
self.service.functions.createWorkflowMessage(chatMessage.model_dump())
# Then save the workflow with updated references and statistics
workflowUpdate = {
"lastActivity": currentTime,
"messageIds": workflow.messageIds,
"stats": workflow.stats.model_dump() # Include updated statistics
}
self.service.functions.updateWorkflow(workflow.id, workflowUpdate)
return chatMessage
def _trimDataInJson(self, jsonObj: Any) -> Any:
"""
Trims the data attribute in JSON objects while preserving other content.
Args:
jsonObj: JSON object to process
Returns:
Processed JSON object with trimmed data attribute
"""
if isinstance(jsonObj, dict):
# Create a copy to avoid modifying the original
result = jsonObj.copy()
if 'data' in result:
# Trim data attribute if it's a string
if isinstance(result['data'], str):
result['data'] = result['data'][:100] + '...'
# If it's a dict or list, convert to string and trim
else:
result['data'] = str(result['data'])[:100] + '...'
return result
return jsonObj
def logAdd(self, workflow: ChatWorkflow, message: str, level: str = "info",
progress: Optional[int] = None) -> str:
"""
Add a log entry to the workflow.
Args:
workflow: ChatWorkflow object
message: Log message
level: Log level (info, warning, error)
progress: Optional progress percentage
Returns:
str: ID of the created log entry
"""
try:
# Generate log ID
logId = str(uuid.uuid4())
# Create log entry
logEntry = ChatLog(
id=logId,
workflowId=workflow.id,
message=message,
level=level,
progress=progress,
timestamp=datetime.now().isoformat()
)
# Add to workflow logs
workflow.logs.append(logEntry)
# Also log to Python logger
logLevel = getattr(logging, level.upper())
logger.log(logLevel, f"[Workflow {workflow.id}] {message}")
# Save to database
self.service.functions.saveWorkflowLog(workflow.id, logEntry.model_dump())
return logId
except Exception as e:
logger.error(f"Error adding log entry: {str(e)}")
return ""
def saveAgentDocuments(self, agentResults: Dict[str, Any]) -> List[int]:
"""
Saves all documents from agent results as files and returns a list of file IDs.
Enhanced to handle the standardized document format from agents with base64Encoded flag.
Args:
agentResults: Dictionary containing agent feedback and documents
Returns:
List of file IDs for the saved documents
"""
fileIds = []
used_names = set() # Track used names to prevent duplicates
# Extract documents from agent results
documents = agentResults.get("documents", [])
for doc in documents:
try:
# Extract document data according to LucyDOM model
name = doc.name
ext = doc.ext
data = doc.data
base64Encoded = doc.base64Encoded
# Skip if no name or data
if not name or not data:
logger.warning(f"Skipping document with missing name or data. Name: {name}, Has data: {bool(data)}")
continue
# Ensure unique filename
base_name = name
counter = 1
while f"{base_name}.{ext}" in used_names:
base_name = f"{name}_{counter}"
counter += 1
used_names.add(f"{base_name}.{ext}")
# Convert content to bytes based on base64Encoded flag
if isinstance(data, str):
if base64Encoded:
# Decode base64 to bytes
try:
import base64
fileContent = base64.b64decode(data)
except Exception as e:
logger.warning(f"Failed to decode base64 content: {str(e)}")
fileContent = data.encode('utf-8')
base64Encoded = False
else:
# Convert text to bytes
fileContent = data.encode('utf-8')
else:
# Already bytes
fileContent = data
# Determine MIME type based on extension
mimeType = self.service.functions.getMimeType(f"{base_name}.{ext}")
# Create file metadata
fileMeta = self.service.functions.createFile(
name=base_name,
mimeType=mimeType,
size=len(fileContent)
)
if fileMeta and "id" in fileMeta:
# Save file content
if self.service.functions.createFileData(fileMeta["id"], fileContent):
fileIds.append(fileMeta["id"])
logger.info(f"Saved document '{base_name}.{ext}' with file ID: {fileMeta['id']} (base64Encoded: {base64Encoded})")
else:
logger.warning(f"Failed to save content for document '{base_name}.{ext}'")
else:
logger.warning(f"Failed to create file metadata for '{base_name}.{ext}'")
except Exception as e:
logger.error(f"Error saving document from agent results: {str(e)}")
# Continue with other documents instead of failing
continue
return fileIds
def getAvailableDocuments(self, workflow: ChatWorkflow, messageUser: ChatMessage) -> List[Dict[str, Any]]:
"""
Determines all currently available documents from user input and already generated documents.
Args:
messageUser: Current message from the user
workflow: Current workflow object
Returns:
List with information about all available documents, sorted by message sequenceNr in descending order
"""
availableDocs = []
if "messages" in workflow and workflow["messages"]:
for message in workflow["messages"]:
messageId = message.id
sequenceNr = message.sequenceNo
# Determine source
source = "user" if messageId == messageUser.id else "workflow"
# Process documents in this message
if "documents" in message and message["documents"]:
for doc in message["documents"]:
# Get filename using our helper method
filename = self.getFilename(doc)
fileId = doc.fileId
# Extract summaries from all contents
contentSummaries = []
if "contents" in doc and doc["contents"]:
for content in doc["contents"]:
contentSummaries.append({
"contentPart": content.name,
"metadata": content.metadata,
"summary": content.summary,
})
else:
# Add a default content summary if no contents exist
contentSummaries.append({
"contentPart": "1_undefined",
"metadata": "",
"summary": "No content extracted",
})
# Create document info
docInfo = {
"sequenceNr": sequenceNr,
"fileSource": source,
"fileId": fileId,
"messageId": messageId,
"label": filename,
"contentSummaryList": contentSummaries,
}
availableDocs.append(docInfo)
# Sort by message sequenceNr in descending order (newest first)
availableDocs.sort(key=lambda x: x["sequenceNr"], reverse=True)
logger.info(f"Available documents: {len(availableDocs)}")
return availableDocs
def agentProfiles(self) -> List[Dict[str, Any]]:
"""
Gets information about all available agents.
Returns:
List with information about all available agents
"""
return self.agentManager.getAgentInfos()
def getFilename(self, document: ChatDocument) -> str:
"""
Gets the filename from a document by combining name and extension.
Args:
document: Document object
Returns:
Filename with extension
"""
name = document.name
ext = document.ext
if ext:
return f"{name}.{ext}"
return name
def parseJson2text(self, jsonObj: Any) -> str:
"""
Converts a JSON object to a readable text representation.
Args:
jsonObj: JSON object to convert
Returns:
Formatted text representation
"""
if not jsonObj:
return "No data available"
try:
# Format with indentation for better readability
return json.dumps(jsonObj, indent=2, ensure_ascii=False)
except Exception as e:
logger.error(f"Error in JSON conversion: {str(e)}")
return str(jsonObj)
def parseJsonResponse(self, responseText: str) -> Dict[str, Any]:
"""
Parses the JSON response from a text.
Args:
responseText: Text with JSON content
Returns:
Parsed JSON data
"""
try:
# Extract JSON from the text (if mixed with other content)
jsonStart = responseText.find('{')
jsonEnd = responseText.rfind('}') + 1
if jsonStart >= 0 and jsonEnd > jsonStart:
jsonStr = responseText[jsonStart:jsonEnd]
return json.loads(jsonStr)
else:
# Try to parse the entire text
return json.loads(responseText)
except json.JSONDecodeError as e:
logger.error(f"JSON parsing error: {str(e)}")
# Fallback: Return empty structure
return {
"objFinalDocuments": [],
"objWorkplan": [],
"objUserResponse": "Sorry, I could not parse your data.",
"userLanguage": "en"
}
def _createWorkflowData(self, workflow: ChatWorkflow) -> Dict[str, Any]:
"""Creates a workflow data structure."""
return {
"mandateId": self.functions.mandateId,
"userId": self.functions.userid,
"name": workflow.name,
"status": workflow.status,
"startedAt": workflow.startedAt,
"lastActivity": workflow.lastActivity,
"stats": workflow.stats.model_dump()
}
def _checkFileAccess(self, fileId: int) -> bool:
"""Checks if the current user has access to a file."""
file = self.service.functions.getFile(fileId)
if not file:
return False
if file.get("mandateId") != self.functions.mandateId:
logger.warning(f"File {fileId} does not belong to mandate {self.functions.mandateId}")
return False
return True
# Singleton factory for the WorkflowManager
_workflowManagers = {}
_workflowManagerLastAccess = {} # Track last access time for cleanup
async def getWorkflowManager(service) -> WorkflowManager:
"""Get or create a workflow manager instance."""
contextKey = f"{service.functions.mandateId}_{service.functions.userId}"
# Check if we have a cached instance
if contextKey in _workflowManagers:
_workflowManagerLastAccess[contextKey] = time.time()
return _workflowManagers[contextKey]
# Create new instance
manager = WorkflowManager(service)
# Cache the instance
_workflowManagers[contextKey] = manager
_workflowManagerLastAccess[contextKey] = time.time()
return manager