1346 lines
No EOL
56 KiB
Python
1346 lines
No EOL
56 KiB
Python
"""
|
|
Workflow Manager Module for state machine-based backend chat workflow.
|
|
Implements the state machine as defined in the documentation.
|
|
"""
|
|
|
|
import asyncio
|
|
import os
|
|
import logging
|
|
import json
|
|
import re
|
|
import uuid
|
|
import base64
|
|
from datetime import datetime, timedelta
|
|
from typing import Dict, Any, List, Optional, Union, Tuple
|
|
|
|
from modules.mimeUtils import isTextMimeType, determineContentEncoding
|
|
|
|
# Required imports
|
|
from modules.workflowAgentsRegistry import getAgentRegistry
|
|
from modules.lucydomInterface import getLucydomInterface as domInterface
|
|
from modules.documentProcessor import getDocumentContents
|
|
|
|
# Configure logger
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Global settings for the workflow management
|
|
GLOBAL_WORKFLOW_LABELS = {
|
|
"systemName": "AI Assistant", # Default system name for logs
|
|
"workflowStatusMessages": {
|
|
"init": "Workflow initialized",
|
|
"running": "Running workflow",
|
|
"waiting": "Waiting for input",
|
|
"completed": "Workflow completed successfully",
|
|
"stopped": "Workflow stopped by user",
|
|
"failed": "Error in workflow"
|
|
}
|
|
}
|
|
class WorkflowStoppedException(Exception):
|
|
"""Exception raised when a workflow is forcibly stopped with function checkExitCriteria() """
|
|
pass
|
|
|
|
class WorkflowManager:
|
|
"""
|
|
Manages the processing of chat requests, agent execution, and
|
|
the integration of results into the workflow, following a state machine approach.
|
|
"""
|
|
|
|
def __init__(self, mandateId: int, userId: int):
|
|
"""
|
|
Initializes the WorkflowManager with mandate and user context.
|
|
|
|
Args:
|
|
mandateId: ID of the current mandate
|
|
userId: ID of the current user
|
|
"""
|
|
self.mandateId = mandateId
|
|
self.userId = userId
|
|
self.mydom = domInterface(mandateId, userId)
|
|
self.agentRegistry = getAgentRegistry()
|
|
self.agentRegistry.setMydom(self.mydom)
|
|
|
|
|
|
### Workflow State Machine Implementation
|
|
|
|
async def workflowStart(self, userInput: Dict[str, Any], workflowId: Optional[str] = None) -> Dict[str, Any]:
|
|
"""
|
|
Main entry point for starting or continuing a workflow (State 1: Workflow Initialization).
|
|
Initializes a new workflow or loads an existing one based on workflowId.
|
|
|
|
Args:
|
|
userInput: User input with prompt and optional file list
|
|
workflowId: Optional workflow ID to continue an existing workflow
|
|
|
|
Returns:
|
|
Initialized workflow object with status "running"
|
|
"""
|
|
# 1. Initialize workflow or load existing one
|
|
workflow = self.workflowInit(workflowId)
|
|
self.logAdd(workflow, "Starting workflow processing", level="info", progress=0)
|
|
|
|
# Start asynchronous processing
|
|
asyncio.create_task(self.workflowProcess(userInput, workflow))
|
|
|
|
return workflow
|
|
|
|
### Forces exit
|
|
|
|
def checkExitCriteria(self, workflow: Dict[str, Any]):
|
|
current_workflow = self.mydom.loadWorkflowState(workflow["id"])
|
|
if current_workflow["status"] in ["stopped", "failed"]:
|
|
self.logAdd(workflow, f"Workflow processing terminated due to status: {current_workflow['status']}", level="info")
|
|
# Raise an exception to stop execution
|
|
raise WorkflowStoppedException(f"Workflow execution stopped due to status: {current_workflow['status']}")
|
|
|
|
async def workflowStop(self, workflowId: str) -> Dict[str, Any]:
|
|
"""
|
|
Stops a running workflow (State 8: Workflow Stopped).
|
|
Sets status to "stopped" and adds a log entry.
|
|
|
|
Args:
|
|
workflowId: ID of the workflow to stop
|
|
|
|
Returns:
|
|
Updated workflow with status="stopped"
|
|
"""
|
|
workflow = self.mydom.loadWorkflowState(workflowId)
|
|
if not workflow:
|
|
return {"error": "Workflow not found", "status": "failed"}
|
|
|
|
# Update status to stopped
|
|
workflow["status"] = "stopped"
|
|
workflow["lastActivity"] = datetime.now().isoformat()
|
|
|
|
# Update in database
|
|
self.mydom.updateWorkflow(workflowId, {
|
|
"status": workflow["status"],
|
|
"lastActivity": workflow["lastActivity"]
|
|
})
|
|
|
|
self.logAdd(workflow, GLOBAL_WORKFLOW_LABELS["workflowStatusMessages"]["stopped"], level="info", progress=100)
|
|
return workflow
|
|
|
|
async def workflowProcess(self, userInput: Dict[str, Any], workflow: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""
|
|
Main processing function that implements the workflow state machine.
|
|
Handles the complete workflow process from user input to final response.
|
|
|
|
Args:
|
|
userInput: User input with prompt and optional file list
|
|
workflow: Current workflow object
|
|
|
|
Returns:
|
|
Updated workflow with processing results
|
|
"""
|
|
try:
|
|
# State 3: User Message Processing
|
|
self.checkExitCriteria(workflow)
|
|
messageUser = await self.chatMessageToWorkflow("user", None, userInput, workflow)
|
|
messageUser["status"] = "first" # For first message
|
|
|
|
# State 4: Project Manager Analysis
|
|
self.checkExitCriteria(workflow)
|
|
self.logAdd(workflow, "Analyzing request and planning work", level="info", progress=10)
|
|
projectManagerResponse = await self.projectManagerAnalysis(messageUser, workflow)
|
|
objFinalDocuments = projectManagerResponse.get("objFinalDocuments", [])
|
|
objWorkplan = projectManagerResponse.get("objWorkplan", [])
|
|
objUserResponse = projectManagerResponse.get("objUserResponse", "")
|
|
|
|
# Get detected language and set it in the mydom interface
|
|
self.checkExitCriteria(workflow)
|
|
userLanguage = projectManagerResponse.get("userLanguage", "en")
|
|
self.mydom.setUserLanguage(userLanguage)
|
|
|
|
# Save the response as a message in the workflow and add log entries
|
|
self.checkExitCriteria(workflow)
|
|
responseMessage = {
|
|
"role": "assistant",
|
|
"agentName": "Project Manager",
|
|
"content": objUserResponse,
|
|
"status": "step" # As per state machine specification
|
|
}
|
|
self.messageAdd(workflow, responseMessage)
|
|
|
|
self.logAdd(workflow, f"Planned outputs: {len(objFinalDocuments)} documents", level="info", progress=20)
|
|
self.logAdd(workflow, f"Work plan created with {len(objWorkplan)} steps", level="info", progress=25)
|
|
|
|
# State 5: Agent Execution
|
|
objResults = []
|
|
if objWorkplan:
|
|
totalTasks = len(objWorkplan)
|
|
for taskIndex, task in enumerate(objWorkplan):
|
|
self.checkExitCriteria(workflow)
|
|
|
|
agentName = task.get("agent", "unknown")
|
|
progressValue = 30 + int((taskIndex / totalTasks) * 60) # Progress from 30% to 90%
|
|
|
|
progressMsg = f"Running task {taskIndex+1}/{totalTasks}: {agentName}"
|
|
self.logAdd(workflow, progressMsg, level="info", progress=progressValue)
|
|
|
|
taskResults = await self.agentProcessing(task, workflow)
|
|
objResults.extend(taskResults)
|
|
|
|
# Log completion of this task
|
|
self.logAdd(
|
|
workflow,
|
|
f"Completed task {taskIndex+1}/{totalTasks}: {agentName}",
|
|
level="info",
|
|
progress=progressValue + (60/totalTasks)/2
|
|
)
|
|
|
|
# State 6: Final Response Generation
|
|
self.checkExitCriteria(workflow)
|
|
self.logAdd(workflow, "Creating final response", level="info", progress=90)
|
|
finalMessage = await self.generateFinalMessage(objUserResponse, objFinalDocuments, objResults)
|
|
finalMessage["status"] = "last" # As per state machine specification
|
|
self.messageAdd(workflow, finalMessage)
|
|
|
|
# State 7: Workflow Completion
|
|
self.checkExitCriteria(workflow)
|
|
self.workflowFinish(workflow)
|
|
|
|
return workflow
|
|
|
|
except Exception as e:
|
|
# State 2: Workflow Exception
|
|
logger.error(f"Workflow processing error: {str(e)}", exc_info=True)
|
|
workflow["status"] = "failed"
|
|
workflow["lastActivity"] = datetime.now().isoformat()
|
|
|
|
# Update in database
|
|
self.mydom.updateWorkflow(workflow["id"], {
|
|
"status": "failed",
|
|
"lastActivity": workflow["lastActivity"]
|
|
})
|
|
|
|
self.logAdd(workflow, f"Workflow failed: {str(e)}", level="error", progress=100)
|
|
return workflow
|
|
|
|
def workflowInit(self, workflowId: Optional[str] = None) -> Dict[str, Any]:
|
|
"""
|
|
Initializes a workflow or loads an existing one with round counting (State 1: Workflow Initialization).
|
|
|
|
Args:
|
|
workflowId: Optional - ID of the workflow to load
|
|
|
|
Returns:
|
|
Initialized workflow object
|
|
"""
|
|
currentTime = datetime.now().isoformat()
|
|
|
|
workflowExist=self.mydom.getWorkflow(workflowId)
|
|
if workflowId is None or not workflowExist:
|
|
# Create new workflow
|
|
newWorkflowId = str(uuid.uuid4()) if workflowId is None else workflowId
|
|
workflow = {
|
|
"id": newWorkflowId,
|
|
"mandateId": self.mandateId,
|
|
"userId": self.userId,
|
|
"name": f"Workflow {newWorkflowId[:8]}",
|
|
"startedAt": currentTime,
|
|
"messages": [], # Empty list - will be filled with references
|
|
"messageIds": [], # Initialize empty messageIds list
|
|
"logs": [],
|
|
"dataStats": {},
|
|
"currentRound": 1,
|
|
"status": "running",
|
|
"lastActivity": currentTime,
|
|
}
|
|
|
|
# Save to database - only the workflow metadata
|
|
workflowDb = {
|
|
"id": workflow["id"],
|
|
"mandateId": workflow["mandateId"],
|
|
"userId": workflow["userId"],
|
|
"name": workflow["name"],
|
|
"startedAt": workflow["startedAt"],
|
|
"status": workflow["status"],
|
|
"dataStats": workflow["dataStats"],
|
|
"currentRound": workflow["currentRound"],
|
|
"lastActivity": workflow["lastActivity"],
|
|
"messageIds": workflow["messageIds"] # Include messageIds
|
|
}
|
|
self.mydom.createWorkflow(workflowDb)
|
|
|
|
self.logAdd(workflow, GLOBAL_WORKFLOW_LABELS["workflowStatusMessages"]["init"], level="info", progress=0)
|
|
logger.debug(f"CHECK DATA {workflow}")
|
|
return workflow
|
|
else:
|
|
# State 10: Workflow Resumption - Load existing workflow
|
|
workflow = self.mydom.loadWorkflowState(workflowId)
|
|
|
|
# Ensure messageIds exists
|
|
if "messageIds" not in workflow:
|
|
# Initialize from existing messages
|
|
workflow["messageIds"] = [msg["id"] for msg in workflow.get("messages", [])]
|
|
|
|
# Update in database
|
|
self.mydom.updateWorkflow(workflowId, {"messageIds": workflow["messageIds"]})
|
|
|
|
# Update status and increment round counter
|
|
workflow["status"] = "running"
|
|
workflow["lastActivity"] = currentTime
|
|
|
|
# Increment currentRound if it exists, otherwise set it to 1
|
|
if "currentRound" in workflow:
|
|
workflow["currentRound"] += 1
|
|
else:
|
|
workflow["currentRound"] = 1
|
|
|
|
# Update in database - only the relevant workflow fields
|
|
workflowUpdate = {
|
|
"status": workflow["status"],
|
|
"lastActivity": workflow["lastActivity"],
|
|
"currentRound": workflow["currentRound"]
|
|
}
|
|
self.mydom.updateWorkflow(workflowId, workflowUpdate)
|
|
|
|
self.logAdd(workflow, GLOBAL_WORKFLOW_LABELS["workflowStatusMessages"]["running"], level="info", progress=0)
|
|
return workflow
|
|
|
|
def workflowFinish(self, workflow: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""
|
|
Finalizes a workflow and sets the status to 'completed' (State 7: Workflow Completion).
|
|
|
|
Args:
|
|
workflow: Workflow object
|
|
|
|
Returns:
|
|
Updated workflow object
|
|
"""
|
|
# Prepare workflow update data
|
|
workflowUpdate = {
|
|
"status": "completed",
|
|
"lastActivity": datetime.now().isoformat(),
|
|
}
|
|
|
|
# Update the workflow object in memory
|
|
workflow["status"] = workflowUpdate["status"]
|
|
workflow["lastActivity"] = workflowUpdate["lastActivity"]
|
|
|
|
# Save workflow state to database - only relevant fields
|
|
self.mydom.updateWorkflow(workflow["id"], workflowUpdate)
|
|
|
|
self.logAdd(workflow, GLOBAL_WORKFLOW_LABELS["workflowStatusMessages"]["completed"], level="info", progress=100)
|
|
return workflow
|
|
|
|
async def projectManagerAnalysis(self, messageUser: Dict[str, Any], workflow: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""
|
|
Creates the prompt for the project manager and processes the response (State 4: Project Manager Analysis).
|
|
|
|
Args:
|
|
messageUser: Message object with user request
|
|
workflow: Current workflow object
|
|
|
|
Returns:
|
|
Project manager's response with objFinalDocuments, objWorkplan and objUserResponse
|
|
"""
|
|
# Get available agents with their capabilities
|
|
availableAgents = self.agentProfiles()
|
|
|
|
# Create a workflow summary
|
|
workflowSummary = await self.workflowSummarize(workflow, messageUser)
|
|
|
|
# Create a list of currently available documents from user input or previously generated documents
|
|
availableDocuments = self.getAvailableDocuments(workflow, messageUser)
|
|
availableDocsStr = json.dumps(availableDocuments, indent=2)
|
|
|
|
# Create the prompt for the project manager with language detection requirement
|
|
prompt = f"""
|
|
Based on the user request and the provided documents, please analyze the requirements and create a processing plan.
|
|
Also, identify the language of the user's request and include it in your response.
|
|
|
|
<userrequest>
|
|
{messageUser.get('content')}
|
|
</userrequest>
|
|
|
|
# Previous conversation history:
|
|
|
|
{workflowSummary}
|
|
|
|
|
|
# Available documents (currently in workflow):
|
|
|
|
{availableDocsStr}
|
|
|
|
|
|
# Available agents and their capabilities:
|
|
|
|
{self.parseJson2text(availableAgents)}
|
|
|
|
|
|
Please analyze the request and create:
|
|
|
|
1. A list of required result documents (objFinalDocuments)
|
|
2. A plan for executing agents (objWorkplan)
|
|
3. A clear response to the user explaining what you're doing (objUserResponse)
|
|
4. Identified language of the user's request (userLanguage)
|
|
|
|
## IMPORTANT RULES FOR THE WORKPLAN:
|
|
1. Each input document must either already exist (provided by the user or previously created by an agent) or be created by an agent before it's used.
|
|
2. If necessary, convert input documents to a suitable format using agents when the type doesn't match.
|
|
3. Do not define document inputs that don't exist or haven't been generated beforehand.
|
|
4. Create a logical sequence - earlier agents can create documents that are later used as inputs.
|
|
5. If the user has provided documents but hasn't clearly stated what they want, try to act according to the context.
|
|
6. ALL documents provided by the user (where fileSource is "user") MUST be included in the work plan, even if they don't have content summaries or if content extraction failed.
|
|
|
|
Your answer must be strictly in the JSON_OUTPUT format, with no additions before or after the JSON object.
|
|
|
|
JSON_OUTPUT = {{
|
|
"objFinalDocuments": ["label",...], # document label in the format 'filename.ext'
|
|
"objWorkplan": [
|
|
{{
|
|
"agent": "agent_name", # Name of an available agent
|
|
"prompt": "Specific instructions to the agent, that he knows what to do with which documents and which output to provide."
|
|
"outputDocuments": [
|
|
{{
|
|
"label":"document label in the format 'filename.ext'",
|
|
"prompt":"AI prompt to describe the content of the file"
|
|
}}
|
|
],
|
|
"inputDocuments": [
|
|
{{
|
|
"label":"document label in the format 'filename.ext'",
|
|
"fileId":id, # if refering to an existing document, provide fileId to select the correct file
|
|
"contentPart":"", # provide empty string, if all document contents to consider, otherwise the contentPart of the document to focus on
|
|
"prompt":"AI prompt to describe what data to extract from the file."
|
|
}}
|
|
], # If no input documents are needed, include "inputDocuments" as an empty list
|
|
}}
|
|
# Multiple agent tasks can be added here and should build logically on each other
|
|
],
|
|
"objUserResponse": "Information to the user about how his request will be solved, in the language of the user's request.",
|
|
"userLanguage": "en" # Language code (e.g., en, de, fr, es) based on the user's request
|
|
}}
|
|
|
|
## RULES for inputDocuments:
|
|
1. The user request refers to documents where "fileSource" in available documents is "user". Those documents are in the focus for input
|
|
2. In case of redundant label in available documents, use document with highest sequenceNr if not specified differently
|
|
3. ALL documents provided by the user MUST be included in the work plan, even if they don't have content summaries or if content extraction failed
|
|
|
|
## STRICT RULES FOR document "label":
|
|
1. Every document label MUST include a proper file extension that matches the content type.
|
|
2. Use standard extensions like:
|
|
- ".txt" for text files
|
|
- ".md" for markdown files
|
|
- ".csv" for comma-separated values
|
|
- ".json" for JSON data
|
|
- ".html" for HTML content
|
|
- ".jpg" or ".png" for images
|
|
- ".docx" for Word documents
|
|
- ".xlsx" for Excel files
|
|
- ".pdf" for PDF documents
|
|
3. Use descriptive filenames that indicate the document's purpose (e.g., "analysis_report.txt" rather than just "report.txt")
|
|
4. If you use label for an existing file
|
|
"""
|
|
|
|
# Call the AI service through mydom for language support
|
|
logger.debug(f"PROJECT MANAGER Planning prompt: {prompt}")
|
|
projectManagerOutput = await self.mydom.callAi([
|
|
{
|
|
"role": "system",
|
|
"content": "You are an experienced project manager who analyzes user requests and creates work plans. You pay very careful attention to ensure that all document dependencies are correct and that no non-existent documents are defined as inputs. The output follows strictly the specified format."
|
|
},
|
|
{
|
|
"role": "user",
|
|
"content": prompt
|
|
}
|
|
])
|
|
|
|
# Parse the JSON response
|
|
logger.debug(f"PROJECT MANAGER Planning answer: {projectManagerOutput}")
|
|
return self.parseJsonResponse(projectManagerOutput)
|
|
|
|
async def agentProcessing(self, task: Dict[str, Any], workflow: Dict[str, Any]) -> List[Dict[str, Any]]:
|
|
"""
|
|
Process a single agent task from the workflow (State 5: Agent Execution).
|
|
Optimized for the task-based approach where all agents implement processTask.
|
|
|
|
Args:
|
|
task: The task definition containing agent name, prompt, and document specifications
|
|
workflow: The current workflow object
|
|
|
|
Returns:
|
|
List of document objects created by the agent
|
|
"""
|
|
# 1. Extract task information
|
|
agentName = task.get("agent")
|
|
agentPrompt = task.get("prompt", "")
|
|
|
|
# Get agent from registry
|
|
agent = self.agentRegistry.getAgent(agentName)
|
|
if not agent:
|
|
logger.error(f"Agent '{agentName}' not found")
|
|
return []
|
|
agentLabel = agent.label
|
|
|
|
# Log the current step
|
|
outputLabels = []
|
|
for doc in task.get("outputDocuments", []):
|
|
outputLabels.append(doc.get("label", "unknown"))
|
|
|
|
stepInfo = f"Agent {agentLabel} to create {', '.join(outputLabels)}."
|
|
self.logAdd(workflow, stepInfo, level="info")
|
|
|
|
# Check if prompt is empty
|
|
if agentPrompt == "":
|
|
logger.warning("Empty prompt, no task to do")
|
|
return []
|
|
|
|
# Prepare output document specifications
|
|
outputSpecs = []
|
|
for doc in task.get("outputDocuments", []):
|
|
outputSpec = {
|
|
"label": doc.get("label"),
|
|
"description": doc.get("prompt", "")
|
|
}
|
|
outputSpecs.append(outputSpec)
|
|
|
|
# Prepare input documents for the agent
|
|
inputDocuments = await self.prepareAgentInputDocuments(task.get('inputDocuments', []), workflow)
|
|
|
|
# Create a standardized task object for the agent as per state machine spec
|
|
agentTask = {
|
|
"taskId": str(uuid.uuid4()),
|
|
"workflowId": workflow.get("id"),
|
|
"prompt": agentPrompt,
|
|
"inputDocuments": inputDocuments,
|
|
"outputSpecifications": outputSpecs,
|
|
"context": {
|
|
"workflowRound": workflow.get("currentRound", 1),
|
|
"agentType": agentName,
|
|
"timestamp": datetime.now().isoformat(),
|
|
"language": self.mydom.userLanguage # Pass language to agent
|
|
}
|
|
}
|
|
|
|
# Execute the agent with the standardized task
|
|
try:
|
|
# Process the task using the agent's standardized interface
|
|
logger.debug("TASK: "+self.parseJson2text(agentTask))
|
|
logger.debug(f"Agent '{agentName}' AI service available: {agent.mydom is not None}")
|
|
|
|
agentResults = await agent.processTask(agentTask)
|
|
|
|
logger.debug(f"Agent '{agentName}' completed task. RESULT: {self.parseJson2text(agentResults)}")
|
|
|
|
# Log the agent response
|
|
self.logAdd(
|
|
workflow,
|
|
f"Agent {agentLabel} completed task. Feedback: {agentResults.get('feedback', 'No feedback provided')}",
|
|
level="info"
|
|
)
|
|
|
|
# Store produced files and prepare input object for message
|
|
agentInputs = {
|
|
"prompt": agentResults.get("feedback", ""),
|
|
"listFileId": self.saveAgentDocuments(agentResults)
|
|
}
|
|
|
|
# Create a message in the workflow with the agent's response
|
|
agentMessage = await self.chatMessageToWorkflow("assistant", agent, agentInputs, workflow)
|
|
agentMessage["status"] = "step" # As per state machine specification
|
|
logger.debug(f"Agent result = {self.parseJson2text(agentMessage)}.")
|
|
|
|
return agentMessage.get("documents", [])
|
|
|
|
except Exception as e:
|
|
errorMsg = f"Error executing agent '{agentLabel}': {str(e)}"
|
|
logger.error(errorMsg, exc_info=True) # Add exc_info=True to get full traceback
|
|
self.logAdd(workflow, errorMsg, level="error")
|
|
return []
|
|
|
|
async def generateFinalMessage(self, objUserResponse: str, objFinalDocuments: List[str], objResults: List[Dict[str, Any]]) -> Dict[str, Any]:
|
|
"""
|
|
Creates the final response message with review of promised and delivered documents (State 6: Final Response Generation).
|
|
|
|
Args:
|
|
objUserResponse: Initial text response to the user
|
|
objFinalDocuments: List of expected response documents
|
|
objResults: List of generated result documents
|
|
|
|
Returns:
|
|
Complete message object with content and relevant documents
|
|
"""
|
|
# Find documents that match the objFinalDocuments requirements
|
|
matchingDocuments = []
|
|
|
|
if len(objFinalDocuments) > 0:
|
|
for answerLabel in objFinalDocuments:
|
|
# Find matching document in results
|
|
for doc in objResults:
|
|
docName = self.getFilename(doc)
|
|
# Check if this document matches the answer specification
|
|
if docName == answerLabel:
|
|
contentRef = []
|
|
for c in doc.get("contents", []):
|
|
contentRef.append(c.get("summary", ""))
|
|
docRef = {
|
|
"label": docName,
|
|
"contentSummary": contentRef
|
|
}
|
|
matchingDocuments.append(docRef)
|
|
break
|
|
|
|
# Use the mydom for language-aware AI calls
|
|
finalPrompt = await self.mydom.callAi([
|
|
{"role": "system", "content": "You are a project manager, who delivers results to a user."},
|
|
{"role": "user", "content": f"""
|
|
Give the final short feedback to the user with reference to the initial statement (objUserResponse). Inform him about the list of filesDelivered. You do not need to send the files, this is handled separately. If in the list of filesDelivered some files_promised would be missing, just give a comment on this, otherwise task is now completed successfully.
|
|
|
|
Here the data:
|
|
objUserResponse = {self.parseJson2text(objUserResponse)}
|
|
filesPromised = {self.parseJson2text(objFinalDocuments)}
|
|
filesDelivered = {self.parseJson2text(matchingDocuments)}
|
|
"""
|
|
}
|
|
], produceUserAnswer=True)
|
|
|
|
# Create basic message structure with proper fields
|
|
logger.debug(f"FINAL PROMPT = {self.parseJson2text(finalPrompt)}.")
|
|
finalMessage = {
|
|
"role": "assistant",
|
|
"agentName": "Project Manager",
|
|
"content": finalPrompt,
|
|
"documents": [] # DO NOT include the results documents, already with agents
|
|
}
|
|
|
|
logger.debug(f"FINAL MESSAGE = {self.parseJson2text(finalMessage)}.")
|
|
return finalMessage
|
|
|
|
async def workflowSummarize(self, workflow: Dict[str, Any], messageUser: Dict[str, Any]) -> str:
|
|
"""
|
|
Creates a summary of the workflow without the current user message.
|
|
|
|
Args:
|
|
workflow: Workflow object
|
|
messageUser: Current user message
|
|
|
|
Returns:
|
|
Summary of the workflow
|
|
"""
|
|
if not workflow or "messages" not in workflow or not workflow["messages"]:
|
|
return "" # First message
|
|
|
|
# Go through messages in chronological order
|
|
messages = sorted(workflow["messages"], key=lambda m: m.get("sequenceNo", 0), reverse=False)
|
|
|
|
summaryParts = []
|
|
for message in messages:
|
|
if message["id"] != messageUser["id"]:
|
|
messageSummary = await self.messageSummarize(message)
|
|
summaryParts.append(messageSummary)
|
|
|
|
return "\n\n".join(summaryParts)
|
|
|
|
async def messageSummarize(self, message: Dict[str, Any]) -> str:
|
|
"""
|
|
Creates a summary of a message including its documents.
|
|
|
|
Args:
|
|
message: Message to summarize
|
|
|
|
Returns:
|
|
Summary of the message
|
|
"""
|
|
role = message.get("role", "undefined")
|
|
agentName = message.get("agentName", "")
|
|
content = message.get("content", "")
|
|
|
|
try:
|
|
# Use the mydom for language-aware AI calls
|
|
contentSummary = await self.mydom.callAi([
|
|
{"role": "system", "content": f"You are a chat message summarizer. Create a very concise summary (2-3 sentences, maximum 300 characters)"},
|
|
{"role": "user", "content": content}
|
|
])
|
|
except Exception as e:
|
|
logger.error(f"Error creating summary: {str(e)}")
|
|
contentSummary = content[:200] + "..."
|
|
|
|
# Summarize documents
|
|
docsSummary = ""
|
|
if "documents" in message and message["documents"]:
|
|
docsList = []
|
|
for i, doc in enumerate(message["documents"]):
|
|
docName = self.getFilename(doc)
|
|
docsList.append(docName)
|
|
if docsList:
|
|
docsSummary = "\nDocuments:" + "\n- ".join(docsList)
|
|
|
|
return f"[{role} {agentName}]: {contentSummary}{docsSummary}"
|
|
|
|
async def chatMessageToWorkflow(self, role: str, agent: Dict[str, Any], chatMessage: Dict[str, Any], workflow: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""
|
|
Integrates user inputs into a Message object including files with complete contents (State 3: User Message Processing).
|
|
|
|
Args:
|
|
role: Role of the message sender ('user' or 'assistant')
|
|
agentName: Name of the agent, if message is from an agent
|
|
chatMessage: Input data with "prompt"=str, "listFileId"=[]
|
|
workflow: Current workflow object
|
|
|
|
Returns:
|
|
Message object with content and documents including contents
|
|
"""
|
|
agentName = "" if agent is None else agent.name
|
|
agentLabel = "" if agent is None else agent.label
|
|
logger.info(f"Message from {role} {agentName} sent with {len(chatMessage.get('listFileId', []))} documents")
|
|
logger.debug(f"message = {self.parseJson2text(chatMessage)}.")
|
|
|
|
# Check message content
|
|
messageContent = chatMessage.get("prompt", "")
|
|
if isinstance(messageContent, dict) and "content" in messageContent:
|
|
messageContent = messageContent["content"]
|
|
|
|
# If message content is empty, no chat
|
|
if role == "user" and (messageContent is None or messageContent.strip() == ""):
|
|
logger.warning(f"Empty message, no chat")
|
|
messageContent = "(No user input received)"
|
|
|
|
# Process additional files with complete contents
|
|
additionalFileIds = chatMessage.get("listFileId", [])
|
|
additionalFiles = await self.processFileIds(additionalFileIds)
|
|
|
|
# Create message object
|
|
messageObject = {
|
|
"role": role,
|
|
"agentName": agentLabel,
|
|
"content": messageContent,
|
|
"documents": additionalFiles,
|
|
"status": chatMessage.get("status", "")
|
|
}
|
|
|
|
messageObject = self.messageAdd(workflow, messageObject)
|
|
logger.debug(f"message_user = {self.parseJson2text(messageObject)}.")
|
|
return messageObject
|
|
|
|
async def processFileIds(self, fileIds: List[int]) -> List[Dict[str, Any]]:
|
|
"""
|
|
Processes a list of File-IDs and returns the corresponding file objects as a list of Document objects.
|
|
Loads all contents directly and adds summaries to each content item.
|
|
Now properly handles the base64Encoded flag.
|
|
|
|
Args:
|
|
fileIds: List of file IDs
|
|
|
|
Returns:
|
|
List of Document objects with contents, summaries, and base64Encoded flags
|
|
"""
|
|
documents = []
|
|
logger.info(f"Processing {len(fileIds)} files")
|
|
|
|
for fileId in fileIds:
|
|
try:
|
|
# Check if the file exists
|
|
file = self.mydom.getFile(fileId)
|
|
if not file:
|
|
logger.warning(f"File with ID {fileId} not found")
|
|
continue
|
|
|
|
# Check if file belongs to the current mandate
|
|
if file.get("mandateId") != self.mandateId:
|
|
logger.warning(f"File {fileId} does not belong to mandate {self.mandateId}")
|
|
continue
|
|
|
|
# Load file content
|
|
fileContent = self.mydom.getFileData(fileId)
|
|
if fileContent is None:
|
|
logger.warning(f"No content found for file with ID {fileId}")
|
|
continue
|
|
|
|
# Determine if file is text or binary based on MIME type
|
|
mimeType = file.get("mimeType", "application/octet-stream")
|
|
isTextFormat = isTextMimeType(mimeType)
|
|
|
|
# Get file data from database
|
|
fileDataEntries = self.mydom.db.getRecordset("fileData", recordFilter={"id": fileId})
|
|
base64Encoded = False
|
|
|
|
if fileDataEntries and "base64Encoded" in fileDataEntries[0]:
|
|
# Use the flag from the database
|
|
base64Encoded = fileDataEntries[0]["base64Encoded"]
|
|
else:
|
|
# Determine based on file type (fallback for older data)
|
|
base64Encoded = not isTextFormat
|
|
|
|
# Convert to base64 for document storage
|
|
import base64
|
|
encodedData = ""
|
|
|
|
if base64Encoded:
|
|
# Already base64 encoded in database
|
|
encodedData = base64.b64encode(fileContent).decode('utf-8')
|
|
else:
|
|
# Text file - convert to string if it's bytes
|
|
if isinstance(fileContent, bytes):
|
|
try:
|
|
fileContentStr = fileContent.decode('utf-8')
|
|
encodedData = fileContentStr
|
|
except UnicodeDecodeError:
|
|
# Failed to decode as text, use base64
|
|
encodedData = base64.b64encode(fileContent).decode('utf-8')
|
|
base64Encoded = True
|
|
else:
|
|
# Already a string
|
|
encodedData = fileContent
|
|
|
|
# Create document
|
|
fileNameExt = file.get("name")
|
|
document = {
|
|
"id": f"doc_{str(uuid.uuid4())}",
|
|
"fileId": fileId,
|
|
"name": os.path.splitext(fileNameExt)[0] if os.path.splitext(fileNameExt)[0] else "noname",
|
|
"ext": os.path.splitext(fileNameExt)[1][1:] if os.path.splitext(fileNameExt)[1] else "bin",
|
|
"mimeType": mimeType,
|
|
"data": encodedData,
|
|
"base64Encoded": base64Encoded,
|
|
"metadata": {
|
|
"isText": isTextFormat,
|
|
"base64Encoded": base64Encoded # For backward compatibility
|
|
},
|
|
"contents": []
|
|
}
|
|
|
|
# Extract contents
|
|
contents = getDocumentContents(file, fileContent)
|
|
|
|
# Add summaries to each content item
|
|
for content in contents:
|
|
content["summary"] = await self.getContentExtraction(content)
|
|
|
|
# Ensure base64Encoded flag is set
|
|
if "base64Encoded" not in content:
|
|
# Use the flag from metadata if available
|
|
content["base64Encoded"] = content.get("metadata", {}).get("base64Encoded", not content.get("metadata", {}).get("isText", False))
|
|
|
|
document["contents"] = contents
|
|
|
|
logger.info(f"File {file.get('name', 'unnamed')} (ID: {fileId}) loaded with {len(contents)} contents and summaries")
|
|
documents.append(document)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing file {fileId}: {str(e)}")
|
|
# Continue with remaining files instead of failing
|
|
continue
|
|
|
|
return documents
|
|
|
|
async def prepareAgentInputDocuments(self, docInputList: List[Dict[str, Any]], workflow: Dict[str, Any]) -> List[Dict[str, Any]]:
|
|
"""
|
|
Prepares input documents for an agent, sorted with newest first.
|
|
|
|
Args:
|
|
docInputList: List of required input documents as specified by the project manager
|
|
workflow: Workflow object
|
|
|
|
Returns:
|
|
Prepared input documents for the agent, sorted with newest first
|
|
"""
|
|
preparedInputs = []
|
|
|
|
# Sort workflow messages by sequence number (descending)
|
|
sortedMessages = sorted(
|
|
workflow.get("messages", []),
|
|
key=lambda m: m.get("sequenceNo", 0),
|
|
reverse=True
|
|
)
|
|
|
|
for docSpec in docInputList:
|
|
docFilename = docSpec.get("label", "")
|
|
docFileId = docSpec.get("fileId", "")
|
|
|
|
foundDoc = None
|
|
# Search for the document in sorted workflow messages (newest first)
|
|
for message in sortedMessages:
|
|
for doc in message.get("documents", []):
|
|
if (docFileId != "" and docFileId == doc.get("fileId")) or (docFilename != "" and self.getFilename(doc) == docFilename):
|
|
foundDoc = doc
|
|
break
|
|
if foundDoc:
|
|
break
|
|
if foundDoc:
|
|
# Process document for agent based on the specification
|
|
processedDoc = await self.processDocumentForAgent(foundDoc, docSpec)
|
|
|
|
preparedInputs.append(processedDoc)
|
|
else:
|
|
logger.warning(f"Document with label '{docFilename}', fileId '{docFileId}' not found in workflow")
|
|
|
|
return preparedInputs
|
|
|
|
async def processDocumentForAgent(self, document: Dict[str, Any], docSpec: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""
|
|
Processes a document for an agent based on the document specification.
|
|
Uses AI to extract relevant content from the document based on the specification.
|
|
|
|
Args:
|
|
document: The document to process
|
|
docSpec: The document specification from the project manager
|
|
|
|
Returns:
|
|
Processed document with AI-extracted content
|
|
"""
|
|
processedDoc = document.copy()
|
|
partSpec = docSpec.get("contentPart", "")
|
|
|
|
# Process each content item in the document
|
|
if "contents" in processedDoc:
|
|
processedContents = []
|
|
|
|
for content in processedDoc["contents"]:
|
|
# Check if part required
|
|
if partSpec != "" and partSpec != content.get("name"):
|
|
continue
|
|
|
|
# Get the prompt from the document specification
|
|
summary = docSpec.get("prompt", "Extract the relevant information from this document")
|
|
|
|
# Process content using the shared helper function
|
|
processedContent = content.copy()
|
|
processedContent["dataExtracted"] = await self.getContentExtraction(content, summary)
|
|
processedContent["metadata"]["aiProcessed"] = True
|
|
|
|
processedContents.append(processedContent)
|
|
|
|
processedDoc["contents"] = processedContents
|
|
|
|
return processedDoc
|
|
|
|
async def getContentExtraction(self, content: Dict[str, Any], prompt: str = None) -> str:
|
|
"""
|
|
Helper function that extracts or summarizes content based on its encoding.
|
|
For base64 encoded content, uses callAi4Image. For non-base64 content, uses callAi.
|
|
|
|
Args:
|
|
content: Content item to analyze
|
|
prompt: Custom prompt for extraction (default prompts used if not provided)
|
|
|
|
Returns:
|
|
Extracted or summarized content as text
|
|
"""
|
|
try:
|
|
# Get content data and encoding status
|
|
data = content.get("data", "")
|
|
isBase64 = content.get("base64Encoded", False)
|
|
|
|
# Default prompts if none provided
|
|
if prompt is None:
|
|
textPrompt = "Create a very concise summary (1-2 sentences, maximum 200 characters) about this content."
|
|
imagePrompt = "Create a very concise summary (1-2 sentences, maximum 200 characters) about this image."
|
|
else:
|
|
textPrompt = prompt
|
|
imagePrompt = prompt
|
|
|
|
# Handle base64 encoded content
|
|
if isBase64:
|
|
try:
|
|
# Pass base64 encoded data directly to callAi4Image
|
|
return await self.mydom.callAi4Image(data, content.get("mimeType", "application/octet-stream"), imagePrompt)
|
|
except Exception as e:
|
|
logger.error(f"Error processing base64 content: {str(e)}")
|
|
return f"Error processing content: {str(e)}"
|
|
else:
|
|
# For non-base64 content, use callAi
|
|
return await self.mydom.callAi([
|
|
{"role": "system", "content": "You are a content analyzer. Extract relevant information from the provided content."},
|
|
{"role": "user", "content": f"{textPrompt}\n\nContent:\n{data}"}
|
|
], produceUserAnswer=True)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing content: {str(e)}")
|
|
return f"Error processing content: {str(e)}"
|
|
|
|
def messageAdd(self, workflow: Dict[str, Any], message: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""
|
|
Adds a message to the workflow and updates lastActivity.
|
|
Saves the message in the database and updates the workflow with references.
|
|
|
|
Args:
|
|
workflow: Workflow object
|
|
message: Message to be saved
|
|
|
|
Returns:
|
|
Added message
|
|
"""
|
|
currentTime = datetime.now().isoformat()
|
|
|
|
# Ensure messages list exists
|
|
if "messages" not in workflow:
|
|
workflow["messages"] = []
|
|
|
|
# Generate new message ID if not present
|
|
if "id" not in message:
|
|
message["id"] = f"msg_{str(uuid.uuid4())}"
|
|
|
|
# Add workflow ID and timestamps
|
|
message["workflowId"] = workflow["id"]
|
|
message["startedAt"] = currentTime
|
|
message["finishedAt"] = currentTime
|
|
|
|
# Set sequence number
|
|
message["sequenceNo"] = len(workflow["messages"]) + 1
|
|
|
|
# Ensure required fields are present
|
|
if "role" not in message:
|
|
# Set a default role based on agentName
|
|
message["role"] = "assistant" if message.get("agentName") else "user"
|
|
|
|
if "agentName" not in message:
|
|
message["agentName"] = ""
|
|
|
|
# Set status if not present
|
|
if "status" not in message:
|
|
message["status"] = "step"
|
|
|
|
# Add message to workflow
|
|
workflow["messages"].append(message)
|
|
|
|
# Ensure messageIds list exists
|
|
if "messageIds" not in workflow:
|
|
workflow["messageIds"] = []
|
|
|
|
# Add message ID to the messageIds list
|
|
workflow["messageIds"].append(message["id"])
|
|
|
|
# Update workflow status
|
|
workflow["lastActivity"] = currentTime
|
|
|
|
# Save to database - first the message itself
|
|
self.mydom.createWorkflowMessage(message)
|
|
|
|
# Then save the workflow with updated references
|
|
workflowUpdate = {
|
|
"lastActivity": currentTime,
|
|
"messageIds": workflow["messageIds"] # Update the messageIds field
|
|
}
|
|
self.mydom.updateWorkflow(workflow["id"], workflowUpdate)
|
|
|
|
return message
|
|
|
|
def logAdd(self, workflow: Dict[str, Any], message: str, level: str = "info",
|
|
progress: Optional[int] = None) -> str:
|
|
"""
|
|
Adds a log entry to the workflow and also logs it in the logger.
|
|
Enhanced with standardized formatting and workflow status tracking.
|
|
|
|
Args:
|
|
workflow: Workflow object
|
|
message: Log message
|
|
level: Log level (info, warning, error)
|
|
progress: Optional - Progress value (0-100)
|
|
|
|
Returns:
|
|
ID of the created log entry
|
|
"""
|
|
# Ensure logs list exists
|
|
if "logs" not in workflow:
|
|
workflow["logs"] = []
|
|
|
|
# Generate log ID
|
|
logId = f"log_{str(uuid.uuid4())}"
|
|
|
|
# Get workflow status
|
|
workflowStatus = workflow.get("status", "running")
|
|
|
|
# Set agentName from global settings
|
|
agentName = GLOBAL_WORKFLOW_LABELS.get("systemName", "unknown")
|
|
|
|
# Create log entry
|
|
logEntry = {
|
|
"id": logId,
|
|
"workflowId": workflow["id"],
|
|
"message": message,
|
|
"type": level,
|
|
"timestamp": datetime.now().isoformat(),
|
|
"agentName": agentName,
|
|
"status": workflowStatus
|
|
}
|
|
|
|
# Add progress if provided
|
|
if progress is not None:
|
|
logEntry["progress"] = progress
|
|
|
|
# Add log to workflow
|
|
workflow["logs"].append(logEntry)
|
|
|
|
# Save in database
|
|
self.mydom.createWorkflowLog(logEntry)
|
|
|
|
# Also log in logger
|
|
if level == "info":
|
|
logger.info(f"Workflow {workflow['id']}: {message}")
|
|
elif level == "warning":
|
|
logger.warning(f"Workflow {workflow['id']}: {message}")
|
|
elif level == "error":
|
|
logger.error(f"Workflow {workflow['id']}: {message}")
|
|
|
|
return logId
|
|
|
|
def saveAgentDocuments(self, agentResults: Dict[str, Any]) -> List[int]:
|
|
"""
|
|
Saves all documents from agent results as files and returns a list of file IDs.
|
|
Enhanced to handle the standardized document format from agents with base64Encoded flag.
|
|
|
|
Args:
|
|
agentResults: Dictionary containing agent feedback and documents
|
|
|
|
Returns:
|
|
List of file IDs for the saved documents
|
|
"""
|
|
fileIds = []
|
|
used_names = set() # Track used names to prevent duplicates
|
|
|
|
# Extract documents from agent results
|
|
documents = agentResults.get("documents", [])
|
|
|
|
for doc in documents:
|
|
try:
|
|
# Extract document data according to LucyDOM model
|
|
name = doc.get("name", "")
|
|
ext = doc.get("ext", "")
|
|
data = doc.get("data", "")
|
|
base64Encoded = doc.get("base64Encoded", False)
|
|
|
|
# Skip if no name or data
|
|
if not name or not data:
|
|
logger.warning(f"Skipping document with missing name or data. Name: {name}, Has data: {bool(data)}")
|
|
continue
|
|
|
|
# Ensure unique filename
|
|
base_name = name
|
|
counter = 1
|
|
while f"{base_name}.{ext}" in used_names:
|
|
base_name = f"{name}_{counter}"
|
|
counter += 1
|
|
used_names.add(f"{base_name}.{ext}")
|
|
|
|
# Convert content to bytes based on base64Encoded flag
|
|
if isinstance(data, str):
|
|
if base64Encoded:
|
|
# Decode base64 to bytes
|
|
try:
|
|
import base64
|
|
fileContent = base64.b64decode(data)
|
|
except Exception as e:
|
|
logger.warning(f"Failed to decode base64 content: {str(e)}")
|
|
fileContent = data.encode('utf-8')
|
|
base64Encoded = False
|
|
else:
|
|
# Convert text to bytes
|
|
fileContent = data.encode('utf-8')
|
|
else:
|
|
# Already bytes
|
|
fileContent = data
|
|
|
|
# Determine MIME type based on extension
|
|
mimeType = self.mydom.getMimeType(f"{base_name}.{ext}")
|
|
|
|
# Create file metadata
|
|
fileMeta = self.mydom.createFile(
|
|
name=base_name,
|
|
mimeType=mimeType,
|
|
size=len(fileContent)
|
|
)
|
|
|
|
if fileMeta and "id" in fileMeta:
|
|
# Save file content
|
|
if self.mydom.createFileData(fileMeta["id"], fileContent):
|
|
fileIds.append(fileMeta["id"])
|
|
logger.info(f"Saved document '{base_name}.{ext}' with file ID: {fileMeta['id']} (base64Encoded: {base64Encoded})")
|
|
else:
|
|
logger.warning(f"Failed to save content for document '{base_name}.{ext}'")
|
|
else:
|
|
logger.warning(f"Failed to create file metadata for '{base_name}.{ext}'")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error saving document from agent results: {str(e)}")
|
|
# Continue with other documents instead of failing
|
|
continue
|
|
|
|
return fileIds
|
|
|
|
def getAvailableDocuments(self, workflow: Dict[str, Any], messageUser: Dict[str, Any]) -> List[Dict[str, Any]]:
|
|
"""
|
|
Determines all currently available documents from user input and already generated documents.
|
|
|
|
Args:
|
|
messageUser: Current message from the user
|
|
workflow: Current workflow object
|
|
|
|
Returns:
|
|
List with information about all available documents, sorted by message sequenceNr in descending order
|
|
"""
|
|
availableDocs = []
|
|
|
|
if "messages" in workflow and workflow["messages"]:
|
|
for message in workflow["messages"]:
|
|
messageId = message.get("id", "unknown")
|
|
sequenceNr = message.get("sequenceNo", 0)
|
|
|
|
# Determine source
|
|
source = "user" if messageId == messageUser.get("id") else "workflow"
|
|
|
|
# Process documents in this message
|
|
if "documents" in message and message["documents"]:
|
|
for doc in message["documents"]:
|
|
# Get filename using our helper method
|
|
filename = self.getFilename(doc)
|
|
fileId = doc.get("fileId")
|
|
|
|
# Extract summaries from all contents
|
|
contentSummaries = []
|
|
if "contents" in doc and doc["contents"]:
|
|
for content in doc["contents"]:
|
|
contentSummaries.append({
|
|
"contentPart": content.get("name", "noname"),
|
|
"metadata": content.get("metadata", ""),
|
|
"summary": content.get("summary", "No summary"),
|
|
})
|
|
else:
|
|
# Add a default content summary if no contents exist
|
|
contentSummaries.append({
|
|
"contentPart": "1_undefined",
|
|
"metadata": "",
|
|
"summary": "No content extracted",
|
|
})
|
|
|
|
# Create document info
|
|
docInfo = {
|
|
"sequenceNr": sequenceNr,
|
|
"fileSource": source,
|
|
"fileId": fileId,
|
|
"messageId": messageId,
|
|
"label": filename,
|
|
"contentSummaryList": contentSummaries,
|
|
}
|
|
availableDocs.append(docInfo)
|
|
|
|
# Sort by message sequenceNr in descending order (newest first)
|
|
availableDocs.sort(key=lambda x: x["sequenceNr"], reverse=True)
|
|
|
|
logger.info(f"Available documents: {len(availableDocs)}")
|
|
return availableDocs
|
|
|
|
def agentProfiles(self) -> List[Dict[str, Any]]:
|
|
"""
|
|
Gets information about all available agents.
|
|
|
|
Returns:
|
|
List with information about all available agents
|
|
"""
|
|
return self.agentRegistry.getAgentInfos()
|
|
|
|
def getFilename(self, document: Dict[str, Any]) -> str:
|
|
"""
|
|
Gets the filename from a document by combining name and extension.
|
|
|
|
Args:
|
|
document: Document object
|
|
|
|
Returns:
|
|
Filename with extension
|
|
"""
|
|
name = document.get("name", "unnamed")
|
|
ext = document.get("ext", "")
|
|
if ext:
|
|
return f"{name}.{ext}"
|
|
return name
|
|
|
|
def parseJson2text(self, jsonObj: Any) -> str:
|
|
"""
|
|
Converts a JSON object to a readable text representation.
|
|
|
|
Args:
|
|
jsonObj: JSON object to convert
|
|
|
|
Returns:
|
|
Formatted text representation
|
|
"""
|
|
if not jsonObj:
|
|
return "No data available"
|
|
|
|
try:
|
|
# Format with indentation for better readability
|
|
return json.dumps(jsonObj, indent=2, ensure_ascii=False)
|
|
except Exception as e:
|
|
logger.error(f"Error in JSON conversion: {str(e)}")
|
|
return str(jsonObj)
|
|
|
|
def parseJsonResponse(self, responseText: str) -> Dict[str, Any]:
|
|
"""
|
|
Parses the JSON response from a text.
|
|
|
|
Args:
|
|
responseText: Text with JSON content
|
|
|
|
Returns:
|
|
Parsed JSON data
|
|
"""
|
|
try:
|
|
# Extract JSON from the text (if mixed with other content)
|
|
jsonStart = responseText.find('{')
|
|
jsonEnd = responseText.rfind('}') + 1
|
|
|
|
if jsonStart >= 0 and jsonEnd > jsonStart:
|
|
jsonStr = responseText[jsonStart:jsonEnd]
|
|
return json.loads(jsonStr)
|
|
else:
|
|
# Try to parse the entire text
|
|
return json.loads(responseText)
|
|
except json.JSONDecodeError as e:
|
|
logger.error(f"JSON parsing error: {str(e)}")
|
|
# Fallback: Return empty structure
|
|
return {
|
|
"objFinalDocuments": [],
|
|
"objWorkplan": [],
|
|
"objUserResponse": "Sorry, I could not parse your data.",
|
|
"userLanguage": "en"
|
|
}
|
|
|
|
|
|
# Singleton factory for the WorkflowManager
|
|
_workflowManagers = {}
|
|
_workflowManagerLastAccess = {} # Track last access time for cleanup
|
|
|
|
def getWorkflowManager(mandateId: int = 0, userId: int = 0) -> WorkflowManager:
|
|
"""
|
|
Returns a WorkflowManager for the specified context.
|
|
Reuses existing instances but implements cleanup for inactive instances.
|
|
|
|
Args:
|
|
mandateId: ID of the mandate
|
|
userId: ID of the user
|
|
|
|
Returns:
|
|
WorkflowManager instance
|
|
"""
|
|
contextKey = f"{mandateId}_{userId}"
|
|
current_time = datetime.now()
|
|
|
|
# Update last access time
|
|
_workflowManagerLastAccess[contextKey] = current_time
|
|
|
|
# Cleanup old instances (older than 1 hour)
|
|
cleanup_threshold = current_time - timedelta(hours=1)
|
|
for key in list(_workflowManagers.keys()):
|
|
if _workflowManagerLastAccess.get(key, current_time) < cleanup_threshold:
|
|
del _workflowManagers[key]
|
|
del _workflowManagerLastAccess[key]
|
|
|
|
if contextKey not in _workflowManagers:
|
|
_workflowManagers[contextKey] = WorkflowManager(mandateId, userId)
|
|
return _workflowManagers[contextKey]
|
|
|
|
def cleanupWorkflowManager(mandateId: int, userId: int) -> None:
|
|
"""
|
|
Explicitly cleanup a WorkflowManager instance.
|
|
|
|
Args:
|
|
mandateId: ID of the mandate
|
|
userId: ID of the user
|
|
"""
|
|
contextKey = f"{mandateId}_{userId}"
|
|
if contextKey in _workflowManagers:
|
|
del _workflowManagers[contextKey]
|
|
if contextKey in _workflowManagerLastAccess:
|
|
del _workflowManagerLastAccess[contextKey] |