887 lines
42 KiB
Python
887 lines
42 KiB
Python
# modeReact.py
|
|
# React mode implementation for workflows
|
|
|
|
import json
|
|
import logging
|
|
import re
|
|
import time
|
|
from datetime import datetime, timezone
|
|
from typing import List, Dict, Any
|
|
from modules.datamodels.datamodelChat import (
|
|
TaskStep, TaskContext, TaskResult, ActionItem, TaskStatus,
|
|
ActionResult
|
|
)
|
|
from modules.datamodels.datamodelChat import ChatWorkflow
|
|
from modules.datamodels.datamodelAi import AiCallOptions, OperationType, ProcessingMode, Priority
|
|
from modules.workflows.processing.modes.modeBase import BaseMode
|
|
from modules.workflows.processing.shared.executionState import TaskExecutionState, shouldContinue
|
|
from modules.workflows.processing.shared.promptGenerationActionsReact import (
|
|
generateReactPlanSelectionPrompt,
|
|
generateReactParametersPrompt,
|
|
generateReactRefinementPrompt
|
|
)
|
|
from modules.workflows.processing.shared.placeholderFactory import extractReviewContent
|
|
from modules.workflows.processing.adaptive import IntentAnalyzer, ContentValidator, LearningEngine, ProgressTracker
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class ReactMode(BaseMode):
|
|
"""React mode implementation - iterative plan-act-observe-refine loop"""
|
|
|
|
def __init__(self, services, workflow):
|
|
super().__init__(services, workflow)
|
|
# Initialize adaptive components
|
|
self.intentAnalyzer = IntentAnalyzer()
|
|
self.contentValidator = ContentValidator()
|
|
self.learningEngine = LearningEngine()
|
|
self.progressTracker = ProgressTracker()
|
|
self.currentIntent = None
|
|
# Placeholder service no longer used; prompts are generated directly
|
|
|
|
async def generateActionItems(self, taskStep: TaskStep, workflow: ChatWorkflow,
|
|
previousResults: List = None, enhancedContext: TaskContext = None) -> List[ActionItem]:
|
|
"""React mode doesn't use batch action generation - actions are generated iteratively"""
|
|
# React mode generates actions one at a time in the execution loop
|
|
return []
|
|
|
|
async def executeTask(self, taskStep: TaskStep, workflow: ChatWorkflow, context: TaskContext,
|
|
taskIndex: int = None, totalTasks: int = None) -> TaskResult:
|
|
"""Execute task using React mode - iterative plan-act-observe-refine loop"""
|
|
logger.info(f"=== STARTING TASK {taskIndex or '?'}: {taskStep.objective} ===")
|
|
|
|
# NEW: Analyze user intent with both original prompt and task objective
|
|
# Get original user prompt from services (clean and reliable)
|
|
original_prompt = self.services.currentUserPrompt if self.services and hasattr(self.services, 'currentUserPrompt') else taskStep.objective
|
|
combined_context = f"Original request: {original_prompt}\n\nCurrent task: {taskStep.objective}"
|
|
|
|
self.currentIntent = self.intentAnalyzer.analyzeUserIntent(combined_context, context)
|
|
logger.info(f"Intent analysis (original + task): {self.currentIntent}")
|
|
|
|
# NEW: Reset progress tracking for new task
|
|
self.progressTracker.reset()
|
|
|
|
# Update workflow object before executing task
|
|
if taskIndex is not None:
|
|
self._updateWorkflowBeforeExecutingTask(taskIndex)
|
|
|
|
# Update workflow context for this task
|
|
if taskIndex is not None:
|
|
self.services.workflow.setWorkflowContext(task_number=taskIndex)
|
|
|
|
# Create task start message
|
|
await self.messageCreator.createTaskStartMessage(taskStep, workflow, taskIndex, totalTasks)
|
|
|
|
state = TaskExecutionState(taskStep)
|
|
# React mode uses max_steps instead of max_retries
|
|
state.max_steps = max(1, int(getattr(workflow, 'maxSteps', 5)))
|
|
logger.info(f"Using React mode execution with max_steps: {state.max_steps}")
|
|
|
|
step = 1
|
|
lastReviewDict = None
|
|
|
|
while step <= state.max_steps:
|
|
self._checkWorkflowStopped(workflow)
|
|
|
|
# Update workflow[currentAction] for UI
|
|
self._updateWorkflowBeforeExecutingAction(step)
|
|
self.services.workflow.setWorkflowContext(action_number=step)
|
|
|
|
try:
|
|
t0 = time.time()
|
|
selection = await self._planSelect(context)
|
|
logger.info(f"React step {step}: Selected action: {selection}")
|
|
|
|
# Create user-friendly message BEFORE action execution
|
|
# Action intention message is now handled by the standard message creator in _actExecute
|
|
|
|
result = await self._actExecute(context, selection, taskStep, workflow, step)
|
|
observation = self._observeBuild(result)
|
|
# Attach deterministic label for clarity
|
|
observation['resultLabel'] = result.resultLabel
|
|
|
|
# NEW: Add content validation
|
|
if self.currentIntent and result.documents:
|
|
validationResult = self.contentValidator.validateContent(result.documents, self.currentIntent)
|
|
observation['contentValidation'] = validationResult
|
|
logger.info(f"Content validation: {validationResult['overallSuccess']} (quality: {validationResult['qualityScore']:.2f})")
|
|
|
|
# NEW: Learn from feedback
|
|
feedback = self._collectFeedback(result, validationResult, self.currentIntent)
|
|
self.learningEngine.learnFromFeedback(feedback, context, self.currentIntent)
|
|
|
|
# NEW: Update progress
|
|
self.progressTracker.updateProgress(result, validationResult, self.currentIntent)
|
|
|
|
decision = await self._refineDecide(context, observation)
|
|
|
|
# Store refinement decision in context for next iteration
|
|
if not hasattr(context, 'previous_review_result') or context.previous_review_result is None:
|
|
context.previous_review_result = []
|
|
if decision: # Only append if decision is not None
|
|
context.previous_review_result.append(decision)
|
|
|
|
# Update context with learnings from this step
|
|
if decision and decision.get('reason'):
|
|
if not hasattr(context, 'improvements'):
|
|
context.improvements = []
|
|
context.improvements.append(f"Step {step}: {decision.get('reason')}")
|
|
|
|
# Telemetry: simple duration per step
|
|
duration = time.time() - t0
|
|
self.services.interfaceDbChat.createLog({
|
|
"workflowId": workflow.id,
|
|
"message": f"react_step_duration_sec={duration:.3f}",
|
|
"type": "info"
|
|
})
|
|
lastReviewDict = decision
|
|
|
|
# Create user-friendly message AFTER action execution
|
|
# Action completion message is now handled by the standard message creator in _actExecute
|
|
|
|
except Exception as e:
|
|
logger.error(f"React step {step} error: {e}")
|
|
break
|
|
|
|
# NEW: Use adaptive stopping logic
|
|
progressState = self.progressTracker.getCurrentProgress()
|
|
continueByProgress = self.progressTracker.shouldContinue(progressState, observation.get('contentValidation', {}))
|
|
continueByReview = shouldContinue(observation, lastReviewDict, step, state.max_steps)
|
|
|
|
if not continueByProgress or not continueByReview:
|
|
logger.info(f"Stopping at step {step}: progress={continueByProgress}, review={continueByReview}")
|
|
break
|
|
step += 1
|
|
|
|
# Summarize task result for react mode
|
|
status = TaskStatus.COMPLETED
|
|
success = True
|
|
feedback = lastReviewDict.get('reason') if lastReviewDict and isinstance(lastReviewDict, dict) else 'Completed'
|
|
if lastReviewDict and isinstance(lastReviewDict, dict) and lastReviewDict.get('decision') == 'stop':
|
|
success = True
|
|
|
|
# Create task completion message
|
|
await self.messageCreator.createTaskCompletionMessage(taskStep, workflow, taskIndex, totalTasks,
|
|
type('ReviewResult', (), {'reason': feedback, 'met_criteria': [], 'quality_score': 8})())
|
|
|
|
return TaskResult(
|
|
taskId=taskStep.id,
|
|
status=status,
|
|
success=success,
|
|
feedback=feedback,
|
|
error=None if success else feedback
|
|
)
|
|
|
|
async def _planSelect(self, context: TaskContext) -> Dict[str, Any]:
|
|
"""Plan: select exactly one action. Returns {"action": {method, name}}"""
|
|
bundle = generateReactPlanSelectionPrompt(self.services, context)
|
|
promptTemplate = bundle.prompt
|
|
placeholders = bundle.placeholders
|
|
|
|
self._writeTraceLog("React Plan Selection Prompt", promptTemplate)
|
|
self._writeTraceLog("React Plan Selection Placeholders", placeholders)
|
|
|
|
# Centralized AI call for plan selection (use plan generation quality)
|
|
options = AiCallOptions(
|
|
operationType=OperationType.GENERATE_PLAN,
|
|
priority=Priority.QUALITY,
|
|
compressPrompt=False,
|
|
compressContext=False,
|
|
processingMode=ProcessingMode.DETAILED,
|
|
maxCost=0.10,
|
|
maxProcessingTime=30
|
|
)
|
|
|
|
response = await self.services.ai.callAi(
|
|
prompt=promptTemplate,
|
|
placeholders=placeholders,
|
|
options=options
|
|
)
|
|
self._writeTraceLog("React Plan Selection Response", response)
|
|
jsonStart = response.find('{') if response else -1
|
|
jsonEnd = response.rfind('}') + 1 if response else 0
|
|
if jsonStart == -1 or jsonEnd == 0:
|
|
raise ValueError("No JSON in selection response")
|
|
selection = json.loads(response[jsonStart:jsonEnd])
|
|
if 'action' not in selection or not isinstance(selection['action'], str):
|
|
raise ValueError("Selection missing 'action' as string")
|
|
return selection
|
|
|
|
async def _actExecute(self, context: TaskContext, selection: Dict[str, Any], taskStep: TaskStep,
|
|
workflow: ChatWorkflow, stepIndex: int) -> ActionResult:
|
|
"""Act: request minimal parameters then execute selected action"""
|
|
compoundActionName = selection.get('action', '')
|
|
|
|
# Parse compound action name (e.g., "ai.webResearch" -> method="ai", action="webResearch")
|
|
if '.' not in compoundActionName:
|
|
raise ValueError(f"Invalid compound action name: {compoundActionName}. Expected format: method.action")
|
|
|
|
methodName, actionName = compoundActionName.split('.', 1)
|
|
|
|
# Check if parameters are already provided in the selection
|
|
if 'parameters' in selection and selection['parameters']:
|
|
logger.info("Using parameters from action selection")
|
|
parameters = selection['parameters']
|
|
else:
|
|
logger.info("No parameters in action selection, requesting from AI")
|
|
bundle = generateReactParametersPrompt(self.services, context, compoundActionName)
|
|
promptTemplate = bundle.prompt
|
|
placeholders = bundle.placeholders
|
|
|
|
self._writeTraceLog("React Parameters Prompt", promptTemplate)
|
|
self._writeTraceLog("React Parameters Placeholders", placeholders)
|
|
|
|
# Centralized AI call for parameter suggestion (balanced analysis)
|
|
options = AiCallOptions(
|
|
operationType=OperationType.ANALYSE_CONTENT,
|
|
priority=Priority.BALANCED,
|
|
compressPrompt=True,
|
|
compressContext=False,
|
|
processingMode=ProcessingMode.ADVANCED,
|
|
maxCost=0.05,
|
|
maxProcessingTime=30,
|
|
temperature=0.3, # Slightly higher temperature for better instruction following
|
|
# maxTokens not set - use model's maximum for big JSON responses
|
|
resultFormat="json" # Explicitly request JSON format
|
|
)
|
|
|
|
paramsResp = await self.services.ai.callAi(
|
|
prompt=promptTemplate,
|
|
placeholders=placeholders,
|
|
options=options
|
|
)
|
|
# Parse JSON response
|
|
js = paramsResp[paramsResp.find('{'):paramsResp.rfind('}')+1] if paramsResp else '{}'
|
|
try:
|
|
paramObj = json.loads(js)
|
|
parameters = paramObj.get('parameters', {}) if isinstance(paramObj, dict) else {}
|
|
# Log only the parsed JSON object to avoid duplicated raw text
|
|
try:
|
|
self._writeTraceLog("React Parameters Response", paramObj)
|
|
except Exception:
|
|
pass
|
|
except Exception as e:
|
|
logger.error(f"Failed to parse AI parameters response as JSON: {str(e)}")
|
|
logger.error(f"Response was: {paramsResp}")
|
|
parameters = {}
|
|
|
|
# Apply minimal defaults in-code (language)
|
|
if 'language' not in parameters and hasattr(self.services, 'user') and getattr(self.services.user, 'language', None):
|
|
parameters['language'] = self.services.user.language
|
|
|
|
# Build a synthetic ActionItem for execution routing and labels
|
|
currentRound = getattr(self.workflow, 'currentRound', 0)
|
|
currentTask = getattr(self.workflow, 'currentTask', 0)
|
|
resultLabel = f"round{currentRound}_task{currentTask}_action{stepIndex}_results"
|
|
|
|
taskAction = self._createActionItem({
|
|
"execMethod": methodName,
|
|
"execAction": actionName,
|
|
"execParameters": parameters,
|
|
"execResultLabel": resultLabel,
|
|
"status": TaskStatus.PENDING
|
|
})
|
|
|
|
# Execute using existing single action flow (message creation is handled internally)
|
|
result = await self.actionExecutor.executeSingleAction(taskAction, workflow, taskStep, currentTask, stepIndex, 1)
|
|
|
|
return result
|
|
|
|
def _observeBuild(self, actionResult: ActionResult) -> Dict[str, Any]:
|
|
"""Observe: build compact observation object from ActionResult with full document metadata"""
|
|
previews = []
|
|
notes = []
|
|
if actionResult and actionResult.documents:
|
|
# Process all documents and show full metadata
|
|
for doc in actionResult.documents:
|
|
# Extract all available metadata without content
|
|
docMetadata = {
|
|
"name": getattr(doc, 'documentName', 'Unknown'),
|
|
"mimeType": getattr(doc, 'mimeType', 'Unknown'),
|
|
"size": getattr(doc, 'size', 'Unknown'),
|
|
"created": getattr(doc, 'created', 'Unknown'),
|
|
"modified": getattr(doc, 'modified', 'Unknown'),
|
|
"typeGroup": getattr(doc, 'typeGroup', 'Unknown'),
|
|
"documentId": getattr(doc, 'documentId', 'Unknown'),
|
|
"reference": getattr(doc, 'reference', 'Unknown')
|
|
}
|
|
# Remove 'Unknown' values to keep it clean
|
|
docMetadata = {k: v for k, v in docMetadata.items() if v != 'Unknown'}
|
|
|
|
# Add content size indicator instead of actual content
|
|
if hasattr(doc, 'documentData') and doc.documentData:
|
|
if isinstance(doc.documentData, dict) and 'content' in doc.documentData:
|
|
contentLength = len(str(doc.documentData['content']))
|
|
docMetadata['contentSize'] = f"{contentLength} characters"
|
|
else:
|
|
contentLength = len(str(doc.documentData))
|
|
docMetadata['contentSize'] = f"{contentLength} characters"
|
|
|
|
# Extract comment if available
|
|
if hasattr(doc, 'documentData') and doc.documentData:
|
|
data = getattr(doc, 'documentData', None)
|
|
if isinstance(data, dict):
|
|
comment = data.get("comment", "")
|
|
if comment:
|
|
notes.append(f"Document '{docMetadata.get('name', 'Unknown')}': {comment}")
|
|
|
|
previews.append(docMetadata)
|
|
|
|
observation = {
|
|
"success": bool(actionResult.success),
|
|
"resultLabel": actionResult.resultLabel or "",
|
|
"documentsCount": len(actionResult.documents) if actionResult.documents else 0,
|
|
"previews": previews,
|
|
"notes": notes
|
|
}
|
|
|
|
# NEW: Add content analysis if intent is available
|
|
if self.currentIntent and actionResult.documents:
|
|
contentAnalysis = self._analyzeContent(actionResult.documents)
|
|
observation['contentAnalysis'] = contentAnalysis
|
|
|
|
return observation
|
|
|
|
def _analyzeContent(self, documents: List[Any]) -> Dict[str, Any]:
|
|
"""Analyzes content of documents for adaptive learning"""
|
|
try:
|
|
if not documents:
|
|
return {"contentType": "none", "contentSnippet": "", "intentMatch": False}
|
|
|
|
# Extract content from first document
|
|
firstDoc = documents[0]
|
|
content = ""
|
|
if hasattr(firstDoc, 'documentData'):
|
|
data = firstDoc.documentData
|
|
if isinstance(data, dict) and 'content' in data:
|
|
content = str(data['content'])
|
|
else:
|
|
content = str(data)
|
|
|
|
# Classify content type
|
|
contentType = self._classifyContent(content)
|
|
|
|
# Create content snippet
|
|
contentSnippet = content[:200] + "..." if len(content) > 200 else content
|
|
|
|
# Assess intent match
|
|
intentMatch = self._assessIntentMatch(content, self.currentIntent)
|
|
|
|
return {
|
|
"contentType": contentType,
|
|
"contentSnippet": contentSnippet,
|
|
"intentMatch": intentMatch
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error analyzing content: {str(e)}")
|
|
return {"contentType": "error", "contentSnippet": "", "intentMatch": False}
|
|
|
|
def _classifyContent(self, content: str) -> str:
|
|
"""Classifies the type of content"""
|
|
if not content:
|
|
return "empty"
|
|
|
|
# Check for code
|
|
codeIndicators = ['def ', 'function', 'import ', 'class ', 'for ', 'while ', 'if ']
|
|
if any(indicator in content.lower() for indicator in codeIndicators):
|
|
return "code"
|
|
|
|
# Check for numbers
|
|
if re.search(r'\b\d+\b', content):
|
|
return "numbers"
|
|
|
|
# Check for structured content
|
|
if any(indicator in content for indicator in ['\n', '\t', '|', '-', '*', '1.', '2.']):
|
|
return "structured"
|
|
|
|
# Default to text
|
|
return "text"
|
|
|
|
def _assessIntentMatch(self, content: str, intent: Dict[str, Any]) -> bool:
|
|
"""Assesses if content matches the user intent"""
|
|
if not intent:
|
|
return False
|
|
|
|
dataType = intent.get("dataType", "unknown")
|
|
|
|
if dataType == "numbers":
|
|
# Check if content contains actual numbers, not code
|
|
hasNumbers = bool(re.search(r'\b\d+\b', content))
|
|
isNotCode = not any(keyword in content.lower() for keyword in ['def ', 'function', 'import '])
|
|
return hasNumbers and isNotCode
|
|
|
|
elif dataType == "text":
|
|
# Check if content is readable text
|
|
words = re.findall(r'\b\w+\b', content)
|
|
return len(words) > 5
|
|
|
|
elif dataType == "documents":
|
|
# Check if content is suitable for document creation
|
|
hasStructure = any(indicator in content for indicator in ['\n', '\t', '|', '-', '*'])
|
|
hasContent = len(content.strip()) > 50
|
|
return hasStructure and hasContent
|
|
|
|
return True # Default to match for unknown types
|
|
|
|
def _collectFeedback(self, result: Any, validation: Dict[str, Any], intent: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Collects comprehensive feedback from action execution"""
|
|
try:
|
|
# Extract content summary
|
|
contentDelivered = ""
|
|
if result.documents:
|
|
firstDoc = result.documents[0]
|
|
if hasattr(firstDoc, 'documentData'):
|
|
data = firstDoc.documentData
|
|
if isinstance(data, dict) and 'content' in data:
|
|
content = str(data['content'])
|
|
contentDelivered = content[:100] + "..." if len(content) > 100 else content
|
|
else:
|
|
contentDelivered = str(data)[:100] + "..." if len(str(data)) > 100 else str(data)
|
|
|
|
return {
|
|
"actionAttempted": result.resultLabel or "unknown",
|
|
"parametersUsed": {}, # Would be extracted from action context
|
|
"contentDelivered": contentDelivered,
|
|
"intentMatchScore": validation.get('qualityScore', 0),
|
|
"qualityScore": validation.get('qualityScore', 0),
|
|
"issuesFound": validation.get('improvementSuggestions', []),
|
|
"learningOpportunities": validation.get('improvementSuggestions', []),
|
|
"userSatisfaction": None, # Would be collected from user feedback
|
|
"timestamp": datetime.now(timezone.utc).timestamp()
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error collecting feedback: {str(e)}")
|
|
return {
|
|
"actionAttempted": "unknown",
|
|
"parametersUsed": {},
|
|
"contentDelivered": "",
|
|
"intentMatchScore": 0,
|
|
"qualityScore": 0,
|
|
"issuesFound": [],
|
|
"learningOpportunities": [],
|
|
"userSatisfaction": None,
|
|
"timestamp": datetime.now(timezone.utc).timestamp()
|
|
}
|
|
|
|
async def _refineDecide(self, context: TaskContext, observation: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Refine: decide continue or stop, with reason"""
|
|
# Create proper ReviewContext for extractReviewContent
|
|
from modules.datamodels.datamodelChat import ReviewContext
|
|
reviewContext = ReviewContext(
|
|
task_step=context.task_step,
|
|
task_actions=[],
|
|
action_results=[], # React mode doesn't have action results in this context
|
|
step_result={'observation': observation},
|
|
workflow_id=context.workflow_id,
|
|
previous_results=[]
|
|
)
|
|
|
|
baseReviewContent = extractReviewContent(reviewContext)
|
|
placeholders = {"REVIEW_CONTENT": baseReviewContent}
|
|
|
|
# NEW: Add content validation to review content
|
|
enhancedReviewContent = placeholders.get("REVIEW_CONTENT", "")
|
|
if 'contentValidation' in observation:
|
|
validation = observation['contentValidation']
|
|
enhancedReviewContent += f"\n\nCONTENT VALIDATION:\n"
|
|
enhancedReviewContent += f"Overall Success: {validation['overallSuccess']}\n"
|
|
enhancedReviewContent += f"Quality Score: {validation['qualityScore']:.2f}\n"
|
|
if validation['improvementSuggestions']:
|
|
enhancedReviewContent += f"Improvement Suggestions: {', '.join(validation['improvementSuggestions'])}\n"
|
|
|
|
# NEW: Add content analysis to review content
|
|
if 'contentAnalysis' in observation:
|
|
analysis = observation['contentAnalysis']
|
|
enhancedReviewContent += f"\nCONTENT ANALYSIS:\n"
|
|
enhancedReviewContent += f"Content Type: {analysis['contentType']}\n"
|
|
enhancedReviewContent += f"Intent Match: {analysis['intentMatch']}\n"
|
|
if analysis['contentSnippet']:
|
|
enhancedReviewContent += f"Content Preview: {analysis['contentSnippet']}\n"
|
|
|
|
# NEW: Add progress state to review content
|
|
progressState = self.progressTracker.getCurrentProgress()
|
|
enhancedReviewContent += f"\nPROGRESS STATE:\n"
|
|
enhancedReviewContent += f"Completed Objectives: {len(progressState['completedObjectives'])}\n"
|
|
enhancedReviewContent += f"Partial Achievements: {len(progressState['partialAchievements'])}\n"
|
|
enhancedReviewContent += f"Failed Attempts: {len(progressState['failedAttempts'])}\n"
|
|
enhancedReviewContent += f"Current Phase: {progressState['currentPhase']}\n"
|
|
if progressState['nextActionsSuggested']:
|
|
enhancedReviewContent += f"Next Action Suggestions: {', '.join(progressState['nextActionsSuggested'])}\n"
|
|
|
|
# Update placeholders with enhanced review content
|
|
placeholders["REVIEW_CONTENT"] = enhancedReviewContent
|
|
|
|
bundle = generateReactRefinementPrompt(self.services, context, enhancedReviewContent)
|
|
promptTemplate = bundle.prompt
|
|
placeholders = bundle.placeholders
|
|
|
|
self._writeTraceLog("React Refinement Prompt", promptTemplate)
|
|
self._writeTraceLog("React Refinement Placeholders", placeholders)
|
|
|
|
# Centralized AI call for refinement decision (balanced analysis)
|
|
options = AiCallOptions(
|
|
operationType=OperationType.ANALYSE_CONTENT,
|
|
priority=Priority.BALANCED,
|
|
compressPrompt=True,
|
|
compressContext=False,
|
|
processingMode=ProcessingMode.ADVANCED,
|
|
maxCost=0.05,
|
|
maxProcessingTime=30
|
|
)
|
|
|
|
resp = await self.services.ai.callAi(
|
|
prompt=promptTemplate,
|
|
placeholders=placeholders,
|
|
options=options
|
|
)
|
|
self._writeTraceLog("React Refinement Response", resp)
|
|
js = resp[resp.find('{'):resp.rfind('}')+1] if resp else '{}'
|
|
try:
|
|
decision = json.loads(js)
|
|
except Exception:
|
|
decision = {"decision": "continue", "reason": "default"}
|
|
return decision
|
|
|
|
async def _createReactActionMessage(self, workflow: ChatWorkflow, selection: Dict[str, Any],
|
|
step: int, maxSteps: int, taskIndex: int, messageType: str,
|
|
result: ActionResult = None, observation: Dict[str, Any] = None):
|
|
"""Create user-friendly messages for React workflow actions"""
|
|
try:
|
|
action = selection.get('action', {})
|
|
method = action.get('method', '')
|
|
actionName = action.get('name', '')
|
|
|
|
# Get user language
|
|
userLanguage = self.services.user.language if self.services and self.services.user else 'en'
|
|
|
|
if messageType == "before":
|
|
# Message BEFORE action execution
|
|
userMessage = await self._generateActionIntentionMessage(method, actionName, userLanguage)
|
|
messageContent = f"🔄 **Step {step}/{maxSteps}**\n\n{userMessage}"
|
|
status = "step"
|
|
actionProgress = "pending"
|
|
documentsLabel = f"action_{step}_intention"
|
|
|
|
elif messageType == "after":
|
|
# Message AFTER action execution
|
|
userMessage = await self._generateActionResultMessage(method, actionName, result, observation, userLanguage)
|
|
successIcon = "✅" if result and result.success else "❌"
|
|
messageContent = f"{successIcon} **Step {step}/{maxSteps} Complete**\n\n{userMessage}"
|
|
status = "step"
|
|
actionProgress = "success" if result and result.success else "fail"
|
|
documentsLabel = observation.get('resultLabel') if observation else f"action_{step}_result"
|
|
else:
|
|
return
|
|
|
|
# Create workflow message
|
|
messageData = {
|
|
"workflowId": workflow.id,
|
|
"role": "assistant",
|
|
"message": messageContent,
|
|
"status": status,
|
|
"sequenceNr": len(workflow.messages) + 1,
|
|
"publishedAt": self.services.utils.getUtcTimestamp(),
|
|
"documentsLabel": documentsLabel,
|
|
"documents": [],
|
|
"roundNumber": workflow.currentRound,
|
|
"taskNumber": taskIndex,
|
|
"actionNumber": step,
|
|
"actionProgress": actionProgress
|
|
}
|
|
|
|
message = self.services.interfaceDbChat.createMessage(messageData)
|
|
if message:
|
|
workflow.messages.append(message)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error creating React action message: {str(e)}")
|
|
|
|
async def _generateActionIntentionMessage(self, method: str, actionName: str, userLanguage: str):
|
|
"""Generate user-friendly message explaining what action will do"""
|
|
try:
|
|
# Create a simple AI prompt to generate user-friendly action descriptions
|
|
prompt = f"""Generate a brief, user-friendly message explaining what the {method}.{actionName} action will do.
|
|
|
|
User language: {userLanguage}
|
|
|
|
|
|
Return only the user-friendly message, no technical details."""
|
|
|
|
# Call AI to generate user-friendly message
|
|
response = await self.services.ai.callAi(
|
|
prompt=prompt,
|
|
options=AiCallOptions(
|
|
operationType=OperationType.GENERATE_CONTENT,
|
|
priority=Priority.SPEED,
|
|
compressPrompt=True,
|
|
maxCost=0.01,
|
|
maxProcessingTime=5
|
|
)
|
|
)
|
|
|
|
return response.strip() if response else f"Executing {method}.{actionName} action..."
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error generating action intention message: {str(e)}")
|
|
return f"Executing {method}.{actionName} action..."
|
|
|
|
async def _generateActionResultMessage(self, method: str, actionName: str, result: ActionResult,
|
|
observation: Dict[str, Any], userLanguage: str):
|
|
"""Generate user-friendly message explaining action results"""
|
|
try:
|
|
# Build result context
|
|
resultContext = ""
|
|
if result and result.documents:
|
|
docCount = len(result.documents)
|
|
resultContext = f"Generated {docCount} document(s)"
|
|
elif observation and observation.get('documentsCount', 0) > 0:
|
|
docCount = observation.get('documentsCount', 0)
|
|
resultContext = f"Generated {docCount} document(s)"
|
|
|
|
# Create AI prompt for result message
|
|
prompt = f"""Generate a brief, user-friendly message explaining the result of the {method}.{actionName} action.
|
|
|
|
User language: {userLanguage}
|
|
Success: {result.success if result else 'Unknown'}
|
|
Result context: {resultContext}
|
|
|
|
Return only the user-friendly message, no technical details."""
|
|
|
|
# Call AI to generate user-friendly result message
|
|
response = await self.services.ai.callAi(
|
|
prompt=prompt,
|
|
options=AiCallOptions(
|
|
operationType=OperationType.GENERATE_CONTENT,
|
|
priority=Priority.SPEED,
|
|
compressPrompt=True,
|
|
maxCost=0.01,
|
|
maxProcessingTime=5
|
|
)
|
|
)
|
|
|
|
return response.strip() if response else f"{method}.{actionName} action completed"
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error generating action result message: {str(e)}")
|
|
return f"{method}.{actionName} action completed"
|
|
|
|
def _createActionItem(self, actionData: Dict[str, Any]) -> ActionItem:
|
|
"""Creates a new task action for React mode"""
|
|
try:
|
|
import uuid
|
|
|
|
# Ensure ID is present
|
|
if "id" not in actionData or not actionData["id"]:
|
|
actionData["id"] = f"action_{uuid.uuid4()}"
|
|
|
|
# Ensure required fields
|
|
if "status" not in actionData:
|
|
actionData["status"] = TaskStatus.PENDING
|
|
|
|
if "execMethod" not in actionData:
|
|
logger.error("execMethod is required for task action")
|
|
return None
|
|
|
|
if "execAction" not in actionData:
|
|
logger.error("execAction is required for task action")
|
|
return None
|
|
|
|
if "execParameters" not in actionData:
|
|
actionData["execParameters"] = {}
|
|
|
|
# Use generic field separation based on ActionItem model
|
|
simpleFields, objectFields = self.services.interfaceDbChat._separate_object_fields(ActionItem, actionData)
|
|
|
|
# Create action in database
|
|
createdAction = self.services.interfaceDbChat.db.recordCreate(ActionItem, simpleFields)
|
|
|
|
# Convert to ActionItem model
|
|
return ActionItem(
|
|
id=createdAction["id"],
|
|
execMethod=createdAction["execMethod"],
|
|
execAction=createdAction["execAction"],
|
|
execParameters=createdAction.get("execParameters", {}),
|
|
execResultLabel=createdAction.get("execResultLabel"),
|
|
expectedDocumentFormats=createdAction.get("expectedDocumentFormats"),
|
|
status=createdAction.get("status", TaskStatus.PENDING),
|
|
error=createdAction.get("error"),
|
|
retryCount=createdAction.get("retryCount", 0),
|
|
retryMax=createdAction.get("retryMax", 3),
|
|
processingTime=createdAction.get("processingTime"),
|
|
timestamp=float(createdAction.get("timestamp", self.services.utils.getUtcTimestamp())),
|
|
result=createdAction.get("result"),
|
|
resultDocuments=createdAction.get("resultDocuments", []),
|
|
userMessage=createdAction.get("userMessage")
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error creating task action: {str(e)}")
|
|
return None
|
|
|
|
def _updateWorkflowBeforeExecutingTask(self, taskNumber: int):
|
|
"""Update workflow object before executing a task"""
|
|
try:
|
|
updateData = {
|
|
"currentTask": taskNumber,
|
|
"currentAction": 0,
|
|
"totalActions": 0
|
|
}
|
|
|
|
# Update workflow object
|
|
self.workflow.currentTask = taskNumber
|
|
self.workflow.currentAction = 0
|
|
self.workflow.totalActions = 0
|
|
|
|
# Update in database
|
|
self.services.interfaceDbChat.updateWorkflow(self.workflow.id, updateData)
|
|
logger.info(f"Updated workflow {self.workflow.id} before executing task {taskNumber}: {updateData}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error updating workflow before executing task: {str(e)}")
|
|
|
|
def _updateWorkflowBeforeExecutingAction(self, actionNumber: int):
|
|
"""Update workflow object before executing an action"""
|
|
try:
|
|
updateData = {
|
|
"currentAction": actionNumber
|
|
}
|
|
|
|
# Update workflow object
|
|
self.workflow.currentAction = actionNumber
|
|
|
|
# Update in database
|
|
self.services.interfaceDbChat.updateWorkflow(self.workflow.id, updateData)
|
|
logger.info(f"Updated workflow {self.workflow.id} before executing action {actionNumber}: {updateData}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error updating workflow before executing action: {str(e)}")
|
|
|
|
def _createActionItem(self, actionData: Dict[str, Any]) -> ActionItem:
|
|
"""Creates a new task action for React mode"""
|
|
try:
|
|
import uuid
|
|
|
|
# Ensure ID is present
|
|
if "id" not in actionData or not actionData["id"]:
|
|
actionData["id"] = f"action_{uuid.uuid4()}"
|
|
|
|
# Ensure required fields
|
|
if "status" not in actionData:
|
|
actionData["status"] = TaskStatus.PENDING
|
|
|
|
if "execMethod" not in actionData:
|
|
logger.error("execMethod is required for task action")
|
|
return None
|
|
|
|
if "execAction" not in actionData:
|
|
logger.error("execAction is required for task action")
|
|
return None
|
|
|
|
if "execParameters" not in actionData:
|
|
actionData["execParameters"] = {}
|
|
|
|
# Use generic field separation based on ActionItem model
|
|
simpleFields, objectFields = self.services.interfaceDbChat._separate_object_fields(ActionItem, actionData)
|
|
|
|
# Create action in database
|
|
createdAction = self.services.interfaceDbChat.db.recordCreate(ActionItem, simpleFields)
|
|
|
|
# Convert to ActionItem model
|
|
return ActionItem(
|
|
id=createdAction["id"],
|
|
execMethod=createdAction["execMethod"],
|
|
execAction=createdAction["execAction"],
|
|
execParameters=createdAction.get("execParameters", {}),
|
|
execResultLabel=createdAction.get("execResultLabel"),
|
|
expectedDocumentFormats=createdAction.get("expectedDocumentFormats"),
|
|
status=createdAction.get("status", TaskStatus.PENDING),
|
|
error=createdAction.get("error"),
|
|
retryCount=createdAction.get("retryCount", 0),
|
|
retryMax=createdAction.get("retryMax", 3),
|
|
processingTime=createdAction.get("processingTime"),
|
|
timestamp=float(createdAction.get("timestamp", self.services.utils.getUtcTimestamp())),
|
|
result=createdAction.get("result"),
|
|
resultDocuments=createdAction.get("resultDocuments", []),
|
|
userMessage=createdAction.get("userMessage")
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error creating task action: {str(e)}")
|
|
return None
|
|
|
|
def _writeTraceLog(self, contextText: str, data: Any) -> None:
|
|
"""Write trace data to configured trace file if in debug mode with improved JSON formatting"""
|
|
try:
|
|
import os
|
|
import json
|
|
from datetime import datetime, UTC
|
|
|
|
# Only write if logger is in debug mode
|
|
if logger.level > logging.DEBUG:
|
|
return
|
|
|
|
# Get log directory from configuration
|
|
logDir = self.services.utils.configGet("APP_LOGGING_LOG_DIR", "./")
|
|
if not os.path.isabs(logDir):
|
|
# If relative path, make it relative to the gateway directory
|
|
gatewayDir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
|
|
logDir = os.path.join(gatewayDir, logDir)
|
|
|
|
# Ensure log directory exists
|
|
os.makedirs(logDir, exist_ok=True)
|
|
|
|
# Create trace file path
|
|
traceFile = os.path.join(logDir, "log_trace.log")
|
|
|
|
# Format the trace entry with better structure
|
|
timestamp = datetime.fromtimestamp(self.services.utils.getUtcTimestamp(), UTC).strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
|
|
|
|
# Create a structured trace entry
|
|
traceEntry = f"[{timestamp}] {contextText}\n"
|
|
traceEntry += "=" * 80 + "\n"
|
|
|
|
# Add data if provided with improved formatting
|
|
if data is not None:
|
|
try:
|
|
if isinstance(data, (dict, list)):
|
|
# Format as pretty JSON with better settings
|
|
jsonStr = json.dumps(data, indent=2, default=str, ensure_ascii=False, sort_keys=False)
|
|
traceEntry += f"JSON Data:\n{jsonStr}\n"
|
|
elif isinstance(data, str):
|
|
# For string data, try to parse as JSON first, then fall back to plain text
|
|
try:
|
|
parsed = json.loads(data)
|
|
jsonStr = json.dumps(parsed, indent=2, default=str, ensure_ascii=False, sort_keys=False)
|
|
traceEntry += f"JSON Data (parsed from string):\n{jsonStr}\n"
|
|
except (json.JSONDecodeError, TypeError):
|
|
# Not valid JSON, show as plain text with proper line breaks
|
|
formatted_data = data.replace('\\n', '\n')
|
|
traceEntry += f"Text Data:\n{formatted_data}\n"
|
|
else:
|
|
# For other types, convert to string and try to parse as JSON
|
|
dataStr = str(data)
|
|
try:
|
|
parsed = json.loads(dataStr)
|
|
jsonStr = json.dumps(parsed, indent=2, default=str, ensure_ascii=False, sort_keys=False)
|
|
traceEntry += f"JSON Data (parsed from object):\n{jsonStr}\n"
|
|
except (json.JSONDecodeError, TypeError):
|
|
# Not valid JSON, show as plain text with proper line breaks
|
|
formatted_data = dataStr.replace('\\n', '\n')
|
|
traceEntry += f"Object Data:\n{formatted_data}\n"
|
|
except Exception as e:
|
|
# Fallback to simple string representation
|
|
traceEntry += f"Data (fallback): {str(data)}\n"
|
|
else:
|
|
traceEntry += "No data provided\n"
|
|
|
|
traceEntry += "=" * 80 + "\n\n"
|
|
|
|
# Write to trace file
|
|
with open(traceFile, "a", encoding="utf-8") as f:
|
|
f.write(traceEntry)
|
|
|
|
except Exception as e:
|
|
# Don't log trace errors to avoid recursion
|
|
pass
|
|
|