898 lines
42 KiB
Python
898 lines
42 KiB
Python
from typing import Dict, Any, List, Optional
|
|
import logging
|
|
import uuid
|
|
import asyncio
|
|
import json
|
|
|
|
from modules.datamodels.datamodelChat import (
|
|
UserInputRequest,
|
|
ChatMessage,
|
|
ChatWorkflow,
|
|
ChatDocument,
|
|
WorkflowModeEnum
|
|
)
|
|
from modules.datamodels.datamodelChat import TaskContext
|
|
from modules.workflows.processing.workflowProcessor import WorkflowProcessor
|
|
from modules.workflows.processing.shared.stateTools import WorkflowStoppedException, checkWorkflowStopped
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class WorkflowManager:
|
|
"""Manager for workflow processing and coordination"""
|
|
|
|
def __init__(self, services):
|
|
self.services = services
|
|
self.workflowProcessor = None
|
|
|
|
# Exported functions
|
|
|
|
async def workflowStart(self, userInput: UserInputRequest, workflowMode: WorkflowModeEnum, workflowId: Optional[str] = None) -> ChatWorkflow:
|
|
"""Starts a new workflow or continues an existing one, then launches processing."""
|
|
try:
|
|
# Debug log to check workflowMode parameter
|
|
logger.info(f"WorkflowManager received workflowMode: {workflowMode}")
|
|
currentTime = self.services.utils.timestampGetUtc()
|
|
|
|
if workflowId:
|
|
workflow = self.services.chat.getWorkflow(workflowId)
|
|
if not workflow:
|
|
raise ValueError(f"Workflow {workflowId} not found")
|
|
|
|
# Store workflow in services for reference (this is the ChatWorkflow object)
|
|
self.services.workflow = workflow
|
|
|
|
# CRITICAL: Update all method instances to use the current Services object with the correct workflow
|
|
from modules.workflows.processing.shared.methodDiscovery import discoverMethods
|
|
discoverMethods(self.services)
|
|
logger.debug(f"Updated method instances to use workflow {self.services.workflow.id}")
|
|
|
|
if workflow.status == "running":
|
|
logger.info(f"Stopping running workflow {workflowId} before processing new prompt")
|
|
workflow.status = "stopped"
|
|
workflow.lastActivity = currentTime
|
|
self.services.chat.updateWorkflow(workflowId, {
|
|
"status": "stopped",
|
|
"lastActivity": currentTime
|
|
})
|
|
self.services.chat.storeLog(workflow, {
|
|
"message": "Workflow stopped for new prompt",
|
|
"type": "info",
|
|
"status": "stopped",
|
|
"progress": 1.0
|
|
})
|
|
|
|
newRound = workflow.currentRound + 1
|
|
self.services.chat.updateWorkflow(workflowId, {
|
|
"status": "running",
|
|
"lastActivity": currentTime,
|
|
"currentRound": newRound,
|
|
"workflowMode": workflowMode # Update workflow mode for existing workflows
|
|
})
|
|
|
|
# Reflect updates on the in-memory object without reloading
|
|
workflow.status = "running"
|
|
workflow.lastActivity = currentTime
|
|
workflow.currentRound = newRound
|
|
workflow.workflowMode = workflowMode
|
|
|
|
self.services.chat.storeLog(workflow, {
|
|
"message": f"Workflow resumed (round {workflow.currentRound}) with mode: {workflowMode}",
|
|
"type": "info",
|
|
"status": "running",
|
|
"progress": 0
|
|
})
|
|
|
|
else:
|
|
workflowData = {
|
|
"name": "New Workflow",
|
|
"status": "running",
|
|
"startedAt": currentTime,
|
|
"lastActivity": currentTime,
|
|
"currentRound": 1,
|
|
"currentTask": 0,
|
|
"currentAction": 0,
|
|
"totalTasks": 0,
|
|
"totalActions": 0,
|
|
"mandateId": self.services.user.mandateId,
|
|
"messageIds": [],
|
|
"workflowMode": workflowMode,
|
|
"maxSteps": 5 if workflowMode == WorkflowModeEnum.WORKFLOW_DYNAMIC else 1, # Set maxSteps for Dynamic mode
|
|
}
|
|
|
|
workflow = self.services.chat.createWorkflow(workflowData)
|
|
logger.info(f"Created workflow with mode: {getattr(workflow, 'workflowMode', 'NOT_SET')}")
|
|
logger.info(f"Workflow data passed: {workflowData.get('workflowMode', 'NOT_IN_DATA')}")
|
|
|
|
# Store workflow in services (this is the ChatWorkflow object)
|
|
self.services.workflow = workflow
|
|
|
|
# CRITICAL: Update all method instances to use the current Services object with the correct workflow
|
|
# This ensures cached method instances don't use stale workflow IDs from previous workflows
|
|
from modules.workflows.processing.shared.methodDiscovery import discoverMethods
|
|
discoverMethods(self.services)
|
|
logger.debug(f"Updated method instances to use workflow {self.services.workflow.id}")
|
|
|
|
# Start workflow processing asynchronously
|
|
asyncio.create_task(self._workflowProcess(userInput))
|
|
|
|
return workflow
|
|
except Exception as e:
|
|
logger.error(f"Error starting workflow: {str(e)}")
|
|
raise
|
|
|
|
async def workflowStop(self, workflowId: str) -> ChatWorkflow:
|
|
"""Stops a running workflow."""
|
|
try:
|
|
workflow = self.services.chat.getWorkflow(workflowId)
|
|
if not workflow:
|
|
raise ValueError(f"Workflow {workflowId} not found")
|
|
|
|
# Store workflow in services (this is the ChatWorkflow object)
|
|
self.services.workflow = workflow
|
|
|
|
workflow.status = "stopped"
|
|
workflow.lastActivity = self.services.utils.timestampGetUtc()
|
|
self.services.chat.updateWorkflow(workflowId, {
|
|
"status": "stopped",
|
|
"lastActivity": workflow.lastActivity
|
|
})
|
|
self.services.chat.storeLog(workflow, {
|
|
"message": "Workflow stopped",
|
|
"type": "warning",
|
|
"status": "stopped",
|
|
"progress": 1.0
|
|
})
|
|
return workflow
|
|
except Exception as e:
|
|
logger.error(f"Error stopping workflow: {str(e)}")
|
|
raise
|
|
|
|
# Main processor
|
|
|
|
async def _workflowProcess(self, userInput: UserInputRequest) -> None:
|
|
"""Process a workflow with user input"""
|
|
try:
|
|
# Store the current user prompt in services for easy access throughout the workflow
|
|
self.services.rawUserPrompt = userInput.prompt
|
|
self.services.currentUserPrompt = userInput.prompt
|
|
|
|
# Reset progress logger for new workflow
|
|
self.services.chat._progressLogger = None
|
|
|
|
self.workflowProcessor = WorkflowProcessor(self.services)
|
|
await self._sendFirstMessage(userInput)
|
|
taskPlan = await self._planTasks(userInput)
|
|
await self._executeTasks(taskPlan)
|
|
await self._processWorkflowResults()
|
|
|
|
except WorkflowStoppedException:
|
|
self._handleWorkflowStop()
|
|
|
|
except Exception as e:
|
|
self._handleWorkflowError(e)
|
|
|
|
# Helper functions
|
|
|
|
async def _sendFirstMessage(self, userInput: UserInputRequest) -> None:
|
|
"""Send first message to start workflow"""
|
|
try:
|
|
workflow = self.services.workflow
|
|
checkWorkflowStopped(self.services)
|
|
|
|
# Create initial message using interface
|
|
# For first user message, include round info in the user context label
|
|
roundNum = workflow.currentRound
|
|
contextLabel = f"round{roundNum}_usercontext"
|
|
|
|
messageData = {
|
|
"workflowId": workflow.id,
|
|
"role": "user",
|
|
"message": userInput.prompt,
|
|
"status": "first",
|
|
"sequenceNr": 1,
|
|
"publishedAt": self.services.utils.timestampGetUtc(),
|
|
"documentsLabel": contextLabel,
|
|
"documents": [],
|
|
# Add workflow context fields
|
|
"roundNumber": workflow.currentRound,
|
|
"taskNumber": 0,
|
|
"actionNumber": 0,
|
|
# Add progress status
|
|
"taskProgress": "pending",
|
|
"actionProgress": "pending"
|
|
}
|
|
|
|
# Analyze the user's input to detect language, normalize request, extract intent, and offload bulky context into documents
|
|
# SKIP user intention analysis for AUTOMATION mode - it uses predefined JSON plans
|
|
createdDocs = []
|
|
workflowMode = getattr(workflow, 'workflowMode', None)
|
|
skipIntentionAnalysis = (workflowMode == WorkflowModeEnum.WORKFLOW_AUTOMATION)
|
|
|
|
if skipIntentionAnalysis:
|
|
logger.info("Skipping user intention analysis for AUTOMATION mode - using direct user input")
|
|
# For automation mode, use user input directly without AI analysis
|
|
self.services.currentUserPrompt = userInput.prompt
|
|
detectedLanguage = None
|
|
normalizedRequest = None
|
|
intentText = userInput.prompt
|
|
contextItems = []
|
|
else:
|
|
try:
|
|
analyzerPrompt = (
|
|
"You are an input analyzer. From the user's message, perform ALL of the following in one pass:\n"
|
|
"1) detectedLanguage: detect ISO 639-1 language code (e.g., de, en).\n"
|
|
"2) normalizedRequest: full, explicit restatement of the user's request in the detected language; do NOT summarize; preserve ALL constraints and details.\n"
|
|
"3) intent: concise single-paragraph core request in the detected language for high-level routing.\n"
|
|
"4) contextItems: supportive data blocks to attach as separate documents if significantly larger than the intent (large literal content, long lists/tables, code/JSON blocks, transcripts, CSV fragments, detailed specs). Keep URLs in the intent unless they embed large pasted content.\n\n"
|
|
"Rules:\n"
|
|
"- If total content (intent + data) is < 10% of model max tokens, do not extract; return empty contextItems and keep intent compact and self-contained.\n"
|
|
"- If content exceeds that threshold, move bulky parts into contextItems; keep intent short and clear.\n"
|
|
"- Preserve critical references (URLs, filenames) in intent.\n"
|
|
"- Normalize to the primary detected language if mixed-language.\n\n"
|
|
"Return ONLY JSON (no markdown) with this shape:\n"
|
|
"{\n"
|
|
" \"detectedLanguage\": \"de|en|fr|it|...\",\n"
|
|
" \"normalizedRequest\": \"Full explicit instruction in detected language\",\n"
|
|
" \"intent\": \"Concise normalized request...\",\n"
|
|
" \"contextItems\": [\n"
|
|
" {\n"
|
|
" \"title\": \"User context 1\",\n"
|
|
" \"mimeType\": \"text/plain\",\n"
|
|
" \"content\": \"Full extracted content block here\"\n"
|
|
" }\n"
|
|
" ]\n"
|
|
"}\n\n"
|
|
f"User message:\n{self.services.utils.sanitizePromptContent(userInput.prompt, 'userinput')}"
|
|
)
|
|
|
|
# Call AI analyzer (planning call - will use static parameters)
|
|
aiResponse = await self.services.ai.callAiPlanning(
|
|
prompt=analyzerPrompt,
|
|
placeholders=None,
|
|
debugType="userintention"
|
|
)
|
|
|
|
detectedLanguage = None
|
|
normalizedRequest = None
|
|
intentText = userInput.prompt
|
|
contextItems = []
|
|
|
|
# Parse analyzer response (JSON expected)
|
|
try:
|
|
jsonStart = aiResponse.find('{') if aiResponse else -1
|
|
jsonEnd = aiResponse.rfind('}') + 1 if aiResponse else 0
|
|
if jsonStart != -1 and jsonEnd > jsonStart:
|
|
parsed = json.loads(aiResponse[jsonStart:jsonEnd])
|
|
detectedLanguage = parsed.get('detectedLanguage') or None
|
|
normalizedRequest = parsed.get('normalizedRequest') or None
|
|
if parsed.get('intent'):
|
|
intentText = parsed.get('intent')
|
|
contextItems = parsed.get('contextItems') or []
|
|
except Exception:
|
|
contextItems = []
|
|
|
|
# Update services state
|
|
if detectedLanguage and isinstance(detectedLanguage, str):
|
|
self._setUserLanguage(detectedLanguage)
|
|
try:
|
|
setattr(self.services, 'currentUserLanguage', detectedLanguage)
|
|
except Exception:
|
|
pass
|
|
self.services.currentUserPrompt = intentText or userInput.prompt
|
|
try:
|
|
if normalizedRequest:
|
|
setattr(self.services, 'currentUserPromptNormalized', normalizedRequest)
|
|
if contextItems is not None:
|
|
setattr(self.services, 'currentUserContextItems', contextItems)
|
|
except Exception:
|
|
pass
|
|
|
|
# Create documents for context items
|
|
if contextItems and isinstance(contextItems, list):
|
|
for idx, item in enumerate(contextItems):
|
|
try:
|
|
title = item.get('title') if isinstance(item, dict) else None
|
|
mime = item.get('mimeType') if isinstance(item, dict) else None
|
|
content = item.get('content') if isinstance(item, dict) else None
|
|
if not content:
|
|
continue
|
|
fileName = (title or f"user_context_{idx+1}.txt").strip()
|
|
mimeType = (mime or "text/plain").strip()
|
|
|
|
# Neutralize content before storing if neutralization is enabled
|
|
contentBytes = content.encode('utf-8')
|
|
contentBytes = await self._neutralizeContentIfEnabled(contentBytes, mimeType)
|
|
|
|
# Create file in component storage
|
|
fileItem = self.services.interfaceDbComponent.createFile(
|
|
name=fileName,
|
|
mimeType=mimeType,
|
|
content=contentBytes
|
|
)
|
|
# Persist file data
|
|
self.services.interfaceDbComponent.createFileData(fileItem.id, contentBytes)
|
|
|
|
# Collect file info
|
|
fileInfo = self.services.chat.getFileInfo(fileItem.id)
|
|
from modules.datamodels.datamodelChat import ChatDocument
|
|
doc = ChatDocument(
|
|
fileId=fileItem.id,
|
|
fileName=fileInfo.get("fileName", fileName) if fileInfo else fileName,
|
|
fileSize=fileInfo.get("size", len(contentBytes)) if fileInfo else len(contentBytes),
|
|
mimeType=fileInfo.get("mimeType", mimeType) if fileInfo else mimeType
|
|
)
|
|
createdDocs.append(doc)
|
|
except Exception:
|
|
continue
|
|
except Exception as e:
|
|
logger.warning(f"Prompt analysis failed or skipped: {str(e)}")
|
|
|
|
# Process user-uploaded documents (fileIds) and combine with context documents
|
|
if userInput.listFileId:
|
|
try:
|
|
userDocs = await self._processFileIds(userInput.listFileId, None)
|
|
if userDocs:
|
|
createdDocs.extend(userDocs)
|
|
except Exception as e:
|
|
logger.warning(f"Failed to process user fileIds: {e}")
|
|
|
|
# Finally, persist and bind the first message with combined documents (context + user)
|
|
self.services.chat.storeMessageWithDocuments(workflow, messageData, createdDocs)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error sending first message: {str(e)}")
|
|
raise
|
|
|
|
async def _planTasks(self, userInput: UserInputRequest):
|
|
"""Generate task plan for workflow execution"""
|
|
workflow = self.services.workflow
|
|
handling = self.workflowProcessor
|
|
# Generate task plan first (shared for both modes)
|
|
taskPlan = await handling.generateTaskPlan(userInput.prompt, workflow)
|
|
if not taskPlan or not taskPlan.tasks:
|
|
raise Exception("No tasks generated in task plan.")
|
|
workflowMode = getattr(workflow, 'workflowMode')
|
|
logger.info(f"Workflow object attributes: {workflow.__dict__ if hasattr(workflow, '__dict__') else 'No __dict__'}")
|
|
logger.info(f"Executing workflow mode={workflowMode} with {len(taskPlan.tasks)} tasks")
|
|
return taskPlan
|
|
|
|
async def _executeTasks(self, taskPlan) -> None:
|
|
"""Execute all tasks in the task plan and update workflow status."""
|
|
workflow = self.services.workflow
|
|
handling = self.workflowProcessor
|
|
totalTasks = len(taskPlan.tasks)
|
|
allTaskResults: List = []
|
|
previousResults: List[str] = []
|
|
|
|
for idx, taskStep in enumerate(taskPlan.tasks):
|
|
currentTaskIndex = idx + 1
|
|
logger.info(f"Task {currentTaskIndex}/{totalTasks}: {taskStep.objective}")
|
|
|
|
# Build TaskContext (mode-specific behavior is inside WorkflowProcessor)
|
|
taskContext = TaskContext(
|
|
taskStep=taskStep,
|
|
workflow=workflow,
|
|
workflowId=workflow.id,
|
|
availableDocuments=None,
|
|
availableConnections=None,
|
|
previousResults=previousResults,
|
|
previousHandover=None,
|
|
improvements=[],
|
|
retryCount=0,
|
|
previousActionResults=[],
|
|
previousReviewResult=None,
|
|
isRegeneration=False,
|
|
failurePatterns=[],
|
|
failedActions=[],
|
|
successfulActions=[],
|
|
criteriaProgress={
|
|
'met_criteria': set(),
|
|
'unmet_criteria': set(),
|
|
'attempt_history': []
|
|
}
|
|
)
|
|
|
|
taskResult = await handling.executeTask(taskStep, workflow, taskContext, currentTaskIndex, totalTasks)
|
|
handoverData = await handling.prepareTaskHandover(taskStep, [], taskResult, workflow)
|
|
allTaskResults.append({
|
|
'taskStep': taskStep,
|
|
'taskResult': taskResult,
|
|
'handoverData': handoverData
|
|
})
|
|
if taskResult.success and taskResult.feedback:
|
|
previousResults.append(taskResult.feedback)
|
|
|
|
# Mark workflow as completed; error/stop cases update status elsewhere
|
|
workflow.status = "completed"
|
|
return None
|
|
|
|
async def _processWorkflowResults(self) -> None:
|
|
"""Process workflow results based on workflow status and create appropriate messages"""
|
|
try:
|
|
workflow = self.services.workflow
|
|
try:
|
|
checkWorkflowStopped(self.services)
|
|
except WorkflowStoppedException:
|
|
logger.info(f"Workflow {workflow.id} was stopped during result processing")
|
|
|
|
# Create final stopped message
|
|
stoppedMessage = {
|
|
"workflowId": workflow.id,
|
|
"role": "assistant",
|
|
"message": "🛑 Workflow stopped by user",
|
|
"status": "last",
|
|
"sequenceNr": len(workflow.messages) + 1,
|
|
"publishedAt": self.services.utils.timestampGetUtc(),
|
|
"documentsLabel": "workflow_stopped",
|
|
"documents": [],
|
|
# Add workflow context fields
|
|
"roundNumber": workflow.currentRound,
|
|
"taskNumber": 0,
|
|
"actionNumber": 0,
|
|
# Add progress status
|
|
"taskProgress": "stopped",
|
|
"actionProgress": "stopped"
|
|
}
|
|
self.services.chat.storeMessageWithDocuments(workflow, stoppedMessage, [])
|
|
|
|
# Update workflow status to stopped
|
|
workflow.status = "stopped"
|
|
workflow.lastActivity = self.services.utils.timestampGetUtc()
|
|
self.services.chat.updateWorkflow(workflow.id, {
|
|
"status": "stopped",
|
|
"lastActivity": workflow.lastActivity
|
|
})
|
|
return
|
|
|
|
if workflow.status == 'stopped':
|
|
# Create stopped message
|
|
stopped_message = {
|
|
"workflowId": workflow.id,
|
|
"role": "assistant",
|
|
"message": "🛑 Workflow stopped by user",
|
|
"status": "last",
|
|
"sequenceNr": len(workflow.messages) + 1,
|
|
"publishedAt": self.services.utils.timestampGetUtc(),
|
|
"documentsLabel": "workflow_stopped",
|
|
"documents": [],
|
|
# Add workflow context fields
|
|
"roundNumber": workflow.currentRound,
|
|
"taskNumber": 0,
|
|
"actionNumber": 0,
|
|
# Add progress status
|
|
"taskProgress": "stopped",
|
|
"actionProgress": "stopped"
|
|
}
|
|
self.services.chat.storeMessageWithDocuments(workflow, stopped_message, [])
|
|
|
|
# Update workflow status to stopped
|
|
workflow.status = "stopped"
|
|
workflow.lastActivity = self.services.utils.timestampGetUtc()
|
|
self.services.chat.updateWorkflow(workflow.id, {
|
|
"status": "stopped",
|
|
"lastActivity": workflow.lastActivity,
|
|
"totalTasks": workflow.totalTasks,
|
|
"totalActions": workflow.totalActions
|
|
})
|
|
|
|
# Add stopped log entry
|
|
self.services.chat.storeLog(workflow, {
|
|
"message": "Workflow stopped by user",
|
|
"type": "warning",
|
|
"status": "stopped",
|
|
"progress": 1.0
|
|
})
|
|
return
|
|
elif workflow.status == 'failed':
|
|
# Create error message
|
|
errorMessage = {
|
|
"workflowId": workflow.id,
|
|
"role": "assistant",
|
|
"message": f"Workflow failed: {'Unknown error'}",
|
|
"status": "last",
|
|
"sequenceNr": len(workflow.messages) + 1,
|
|
"publishedAt": self.services.utils.timestampGetUtc(),
|
|
"documentsLabel": "workflow_failure",
|
|
"documents": [],
|
|
# Add workflow context fields
|
|
"roundNumber": workflow.currentRound,
|
|
"taskNumber": 0,
|
|
"actionNumber": 0,
|
|
# Add progress status
|
|
"taskProgress": "fail",
|
|
"actionProgress": "fail"
|
|
}
|
|
self.services.chat.storeMessageWithDocuments(workflow, errorMessage, [])
|
|
|
|
# Update workflow status to failed
|
|
workflow.status = "failed"
|
|
workflow.lastActivity = self.services.utils.timestampGetUtc()
|
|
self.services.chat.updateWorkflow(workflow.id, {
|
|
"status": "failed",
|
|
"lastActivity": workflow.lastActivity,
|
|
"totalTasks": workflow.totalTasks,
|
|
"totalActions": workflow.totalActions
|
|
})
|
|
|
|
# Add failed log entry
|
|
self.services.chat.storeLog(workflow, {
|
|
"message": "Workflow failed: Unknown error",
|
|
"type": "error",
|
|
"status": "failed",
|
|
"progress": 1.0
|
|
})
|
|
return
|
|
|
|
# For successful workflows, send detailed completion message
|
|
await self._sendLastMessage()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing workflow results: {str(e)}")
|
|
# Create error message
|
|
error_message = {
|
|
"workflowId": workflow.id,
|
|
"role": "assistant",
|
|
"message": f"Error processing workflow results: {str(e)}",
|
|
"status": "last",
|
|
"sequenceNr": len(workflow.messages) + 1,
|
|
"publishedAt": self.services.utils.timestampGetUtc(),
|
|
"documentsLabel": "workflow_error",
|
|
"documents": [],
|
|
# Add workflow context fields
|
|
"roundNumber": workflow.currentRound,
|
|
"taskNumber": 0,
|
|
"actionNumber": 0,
|
|
# Add progress status
|
|
"taskProgress": "fail",
|
|
"actionProgress": "fail"
|
|
}
|
|
self.services.chat.storeMessageWithDocuments(workflow, error_message, [])
|
|
|
|
# Update workflow status to failed
|
|
workflow.status = "failed"
|
|
workflow.lastActivity = self.services.utils.timestampGetUtc()
|
|
self.services.chat.updateWorkflow(workflow.id, {
|
|
"status": "failed",
|
|
"lastActivity": workflow.lastActivity,
|
|
"totalTasks": workflow.totalTasks,
|
|
"totalActions": workflow.totalActions
|
|
})
|
|
|
|
async def _sendLastMessage(self) -> None:
|
|
"""Send last message to complete workflow (only for successful workflows)"""
|
|
try:
|
|
workflow = self.services.workflow
|
|
# Safety check: ensure this is only called for successful workflows
|
|
if workflow.status in ['stopped', 'failed']:
|
|
logger.warning(f"Attempted to send last message for {workflow.status} workflow {workflow.id}")
|
|
return
|
|
|
|
# Generate feedback
|
|
feedback = await self._generateWorkflowFeedback()
|
|
|
|
# Create last message using interface
|
|
messageData = {
|
|
"workflowId": workflow.id,
|
|
"role": "assistant",
|
|
"message": feedback,
|
|
"status": "last",
|
|
"sequenceNr": len(workflow.messages) + 1,
|
|
"publishedAt": self.services.utils.timestampGetUtc(),
|
|
"documentsLabel": "workflow_feedback",
|
|
"documents": [],
|
|
# Add workflow context fields
|
|
"roundNumber": workflow.currentRound,
|
|
"taskNumber": 0,
|
|
"actionNumber": 0,
|
|
# Add progress status
|
|
"taskProgress": "success",
|
|
"actionProgress": "success"
|
|
}
|
|
|
|
# Create message using interface
|
|
self.services.chat.storeMessageWithDocuments(workflow, messageData, [])
|
|
|
|
# Update workflow status to completed
|
|
workflow.status = "completed"
|
|
workflow.lastActivity = self.services.utils.timestampGetUtc()
|
|
|
|
# Update workflow in database
|
|
self.services.chat.updateWorkflow(workflow.id, {
|
|
"status": "completed",
|
|
"lastActivity": workflow.lastActivity
|
|
})
|
|
|
|
# Add completion log entry
|
|
self.services.chat.storeLog(workflow, {
|
|
"message": "Workflow completed",
|
|
"type": "success",
|
|
"status": "completed",
|
|
"progress": 1.0
|
|
})
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error sending last message: {str(e)}")
|
|
raise
|
|
|
|
async def _generateWorkflowFeedback(self) -> str:
|
|
"""Generate feedback message for workflow completion"""
|
|
try:
|
|
workflow = self.services.workflow
|
|
checkWorkflowStopped(self.services)
|
|
|
|
# Count messages by role
|
|
userMessages = [msg for msg in workflow.messages if msg.role == 'user']
|
|
assistantMessages = [msg for msg in workflow.messages if msg.role == 'assistant']
|
|
|
|
# Generate summary feedback
|
|
feedback = f"Workflow completed.\n\n"
|
|
feedback += f"Processed {len(userMessages)} user inputs and generated {len(assistantMessages)} responses.\n"
|
|
|
|
# Add final status
|
|
if workflow.status == "completed":
|
|
feedback += "All tasks completed successfully."
|
|
elif workflow.status == "partial":
|
|
feedback += "Some tasks completed with partial success."
|
|
else:
|
|
feedback += f"Workflow status: {workflow.status}"
|
|
|
|
return feedback
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error generating workflow feedback: {str(e)}")
|
|
return "Workflow processing completed."
|
|
|
|
def _handleWorkflowStop(self) -> None:
|
|
"""Handle workflow stop exception"""
|
|
workflow = self.services.workflow
|
|
logger.info("Workflow stopped by user")
|
|
|
|
# Update workflow status to stopped
|
|
workflow.status = "stopped"
|
|
workflow.lastActivity = self.services.utils.timestampGetUtc()
|
|
self.services.chat.updateWorkflow(workflow.id, {
|
|
"status": "stopped",
|
|
"lastActivity": workflow.lastActivity,
|
|
"totalTasks": workflow.totalTasks,
|
|
"totalActions": workflow.totalActions
|
|
})
|
|
|
|
# Create final stopped message
|
|
stopped_message = {
|
|
"workflowId": workflow.id,
|
|
"role": "assistant",
|
|
"message": "🛑 Workflow stopped by user",
|
|
"status": "last",
|
|
"sequenceNr": len(workflow.messages) + 1,
|
|
"publishedAt": self.services.utils.timestampGetUtc(),
|
|
"documentsLabel": "workflow_stopped",
|
|
"documents": [],
|
|
# Add workflow context fields
|
|
"roundNumber": workflow.currentRound,
|
|
"taskNumber": 0,
|
|
"actionNumber": 0,
|
|
# Add progress status
|
|
"taskProgress": "pending",
|
|
"actionProgress": "pending"
|
|
}
|
|
self.services.chat.storeMessageWithDocuments(workflow, stopped_message, [])
|
|
|
|
# Add log entry
|
|
self.services.chat.storeLog(workflow, {
|
|
"message": "Workflow stopped by user",
|
|
"type": "warning",
|
|
"status": "stopped",
|
|
"progress": 1.0
|
|
})
|
|
|
|
def _handleWorkflowError(self, error: Exception) -> None:
|
|
"""Handle workflow error exception"""
|
|
workflow = self.services.workflow
|
|
logger.error(f"Workflow processing error: {str(error)}")
|
|
|
|
# Update workflow status to failed
|
|
workflow.status = "failed"
|
|
workflow.lastActivity = self.services.utils.timestampGetUtc()
|
|
self.services.chat.updateWorkflow(workflow.id, {
|
|
"status": "failed",
|
|
"lastActivity": workflow.lastActivity,
|
|
"totalTasks": workflow.totalTasks,
|
|
"totalActions": workflow.totalActions
|
|
})
|
|
|
|
# Create error message
|
|
error_message = {
|
|
"workflowId": workflow.id,
|
|
"role": "assistant",
|
|
"message": f"Workflow processing failed: {str(error)}",
|
|
"status": "last",
|
|
"sequenceNr": len(workflow.messages) + 1,
|
|
"publishedAt": self.services.utils.timestampGetUtc(),
|
|
"documentsLabel": "workflow_error",
|
|
"documents": [],
|
|
# Add workflow context fields
|
|
"roundNumber": workflow.currentRound,
|
|
"taskNumber": 0,
|
|
"actionNumber": 0,
|
|
# Add progress status
|
|
"taskProgress": "fail",
|
|
"actionProgress": "fail"
|
|
}
|
|
self.services.chat.storeMessageWithDocuments(workflow, error_message, [])
|
|
|
|
# Add error log entry
|
|
self.services.chat.storeLog(workflow, {
|
|
"message": f"Workflow failed: {str(error)}",
|
|
"type": "error",
|
|
"status": "failed",
|
|
"progress": 1.0
|
|
})
|
|
|
|
raise
|
|
|
|
async def _processFileIds(self, fileIds: List[str], messageId: str = None) -> List[ChatDocument]:
|
|
"""Process file IDs from existing files and return ChatDocument objects.
|
|
If neutralization is enabled, files are neutralized and new files are created with neutralized content.
|
|
If neutralization fails, the document is not included and an error is logged to ChatLog."""
|
|
documents = []
|
|
|
|
# Check if neutralization is enabled
|
|
neutralizationEnabled = False
|
|
try:
|
|
config = self.services.neutralization.getConfig()
|
|
neutralizationEnabled = config and config.enabled
|
|
except Exception as e:
|
|
logger.debug(f"Could not check neutralization config: {str(e)}")
|
|
|
|
workflow = self.services.workflow
|
|
|
|
for fileId in fileIds:
|
|
try:
|
|
# Get file info from chat service
|
|
fileInfo = self.services.chat.getFileInfo(fileId)
|
|
if not fileInfo:
|
|
logger.warning(f"No file info found for file ID {fileId}")
|
|
continue
|
|
|
|
originalFileName = fileInfo.get("fileName", "unknown")
|
|
originalMimeType = fileInfo.get("mimeType", "application/octet-stream")
|
|
fileIdToUse = fileId
|
|
fileNameToUse = originalFileName
|
|
fileSizeToUse = fileInfo.get("size", 0)
|
|
neutralizationFailed = False
|
|
|
|
# Neutralize file if enabled
|
|
if neutralizationEnabled:
|
|
try:
|
|
# Neutralize the file using the neutralization service
|
|
neutralizationResult = self.services.neutralization.processFile(fileId)
|
|
|
|
# Check if file is binary (not neutralized)
|
|
if neutralizationResult.get('is_binary', False):
|
|
# Binary file - log INFO and use original file
|
|
infoMsg = f"File '{originalFileName}' (MIME type: {neutralizationResult.get('mime_type', 'unknown')}) is a binary file. Binary file neutralization will be implemented in the future. Using original file without neutralization."
|
|
logger.info(infoMsg)
|
|
self.services.chat.storeLog(workflow, {
|
|
"message": infoMsg,
|
|
"type": "info",
|
|
"status": "running",
|
|
"progress": 50
|
|
})
|
|
# Use original file (fileIdToUse already set to fileId)
|
|
elif neutralizationResult and 'neutralized_text' in neutralizationResult:
|
|
neutralizedText = neutralizationResult['neutralized_text']
|
|
|
|
# Create new file with neutralized content
|
|
neutralizedFileName = neutralizationResult.get('neutralized_file_name', f"neutralized_{originalFileName}")
|
|
neutralizedContentBytes = neutralizedText.encode('utf-8')
|
|
|
|
# Create file in component storage
|
|
neutralizedFileItem = self.services.interfaceDbComponent.createFile(
|
|
name=neutralizedFileName,
|
|
mimeType=originalMimeType,
|
|
content=neutralizedContentBytes
|
|
)
|
|
# Persist file data
|
|
self.services.interfaceDbComponent.createFileData(neutralizedFileItem.id, neutralizedContentBytes)
|
|
|
|
# Use the neutralized file ID and actual size
|
|
fileIdToUse = neutralizedFileItem.id
|
|
fileNameToUse = neutralizedFileName
|
|
fileSizeToUse = len(neutralizedContentBytes)
|
|
|
|
logger.info(f"Neutralized file {fileId} -> {fileIdToUse} ({fileNameToUse})")
|
|
else:
|
|
neutralizationFailed = True
|
|
errorMsg = f"Neutralization did not return neutralized_text for file '{originalFileName}' (ID: {fileId})"
|
|
logger.warning(errorMsg)
|
|
self.services.chat.storeLog(workflow, {
|
|
"message": errorMsg,
|
|
"type": "error",
|
|
"status": "error",
|
|
"progress": -1
|
|
})
|
|
except Exception as e:
|
|
neutralizationFailed = True
|
|
errorMsg = f"Failed to neutralize file '{originalFileName}' (ID: {fileId}): {str(e)}"
|
|
logger.error(errorMsg)
|
|
self.services.chat.storeLog(workflow, {
|
|
"message": errorMsg,
|
|
"type": "error",
|
|
"status": "error",
|
|
"progress": -1
|
|
})
|
|
|
|
# Only skip document if neutralization failed (not for binary files)
|
|
if not neutralizationFailed:
|
|
# Create document with file ID (neutralized or original)
|
|
document = ChatDocument(
|
|
id=str(uuid.uuid4()),
|
|
messageId=messageId or "",
|
|
fileId=fileIdToUse,
|
|
fileName=fileNameToUse,
|
|
fileSize=fileSizeToUse,
|
|
mimeType=originalMimeType
|
|
)
|
|
documents.append(document)
|
|
logger.info(f"Processed file ID {fileId} -> {document.fileName} (using fileId: {fileIdToUse})")
|
|
else:
|
|
logger.warning(f"Skipping document for file ID {fileId} due to neutralization failure")
|
|
except Exception as e:
|
|
errorMsg = f"Error processing file ID {fileId}: {str(e)}"
|
|
logger.error(errorMsg)
|
|
self.services.chat.storeLog(workflow, {
|
|
"message": errorMsg,
|
|
"type": "error",
|
|
"status": "error",
|
|
"progress": -1
|
|
})
|
|
return documents
|
|
|
|
|
|
def _setUserLanguage(self, language: str) -> None:
|
|
"""Set user language for the service center"""
|
|
self.services.user.language = language
|
|
|
|
async def _neutralizeContentIfEnabled(self, contentBytes: bytes, mimeType: str) -> bytes:
|
|
"""Neutralize content if neutralization is enabled in user settings"""
|
|
try:
|
|
# Check if neutralization is enabled
|
|
config = self.services.neutralization.getConfig()
|
|
if not config or not config.enabled:
|
|
return contentBytes
|
|
|
|
# Decode content to text for neutralization
|
|
try:
|
|
textContent = contentBytes.decode('utf-8')
|
|
except UnicodeDecodeError:
|
|
# Try alternative encodings
|
|
for enc in ['latin-1', 'cp1252', 'iso-8859-1']:
|
|
try:
|
|
textContent = contentBytes.decode(enc)
|
|
break
|
|
except UnicodeDecodeError:
|
|
continue
|
|
else:
|
|
# If unable to decode, return original bytes (binary content)
|
|
logger.debug(f"Unable to decode content for neutralization, skipping: {mimeType}")
|
|
return contentBytes
|
|
|
|
# Neutralize the text content
|
|
# Note: The neutralization service should use names from config when processing
|
|
result = self.services.neutralization.processText(textContent)
|
|
if result and 'neutralized_text' in result:
|
|
neutralizedText = result['neutralized_text']
|
|
# Encode back to bytes using the same encoding
|
|
try:
|
|
return neutralizedText.encode('utf-8')
|
|
except Exception as e:
|
|
logger.warning(f"Error encoding neutralized text: {str(e)}")
|
|
return contentBytes
|
|
else:
|
|
logger.warning("Neutralization did not return neutralized_text")
|
|
return contentBytes
|
|
except Exception as e:
|
|
logger.error(f"Error during content neutralization: {str(e)}")
|
|
# Return original content on error
|
|
return contentBytes
|