From 53bfe06dbe531dc8e51839dce1a9d7a2c06ff4d1 Mon Sep 17 00:00:00 2001
From: ValueOn AG
Date: Sun, 2 Nov 2025 18:41:07 +0100
Subject: [PATCH] streamlined ai calling chain for react and workflow mode
---
modules/datamodels/datamodelChat.py | 98 ++++--
modules/datamodels/datamodelDocument.py | 32 +-
.../processing/adaptive/contentValidator.py | 59 +++-
.../processing/adaptive/intentAnalyzer.py | 18 +-
.../processing/adaptive/learningEngine.py | 6 +-
.../processing/core/messageCreator.py | 8 +-
.../workflows/processing/core/taskPlanner.py | 53 +--
.../workflows/processing/core/validator.py | 2 +-
.../processing/modes/modeActionplan.py | 318 ++++++++++--------
.../workflows/processing/modes/modeReact.py | 267 ++++++++++-----
.../processing/shared/executionState.py | 53 ++-
.../processing/shared/placeholderFactory.py | 100 +++---
.../shared/promptGenerationActionsReact.py | 4 +-
.../shared/promptGenerationTaskplan.py | 4 +-
.../workflows/processing/workflowProcessor.py | 4 +-
modules/workflows/workflowManager.py | 28 +-
16 files changed, 657 insertions(+), 397 deletions(-)
diff --git a/modules/datamodels/datamodelChat.py b/modules/datamodels/datamodelChat.py
index 3c03e64b..9b173fd1 100644
--- a/modules/datamodels/datamodelChat.py
+++ b/modules/datamodels/datamodelChat.py
@@ -531,8 +531,17 @@ registerModelLabels(
class ObservationPreview(BaseModel):
name: str = Field(description="Document name or URL label")
- mime: str = Field(description="MIME type or kind")
- snippet: str = Field(description="Short snippet or summary")
+ mime: Optional[str] = Field(default=None, description="MIME type or kind (legacy field)")
+ snippet: Optional[str] = Field(default=None, description="Short snippet or summary")
+ # Extended metadata fields
+ mimeType: Optional[str] = Field(default=None, description="MIME type")
+ size: Optional[str] = Field(default=None, description="File size")
+ created: Optional[str] = Field(default=None, description="Creation timestamp")
+ modified: Optional[str] = Field(default=None, description="Modification timestamp")
+ typeGroup: Optional[str] = Field(default=None, description="Document type group")
+ documentId: Optional[str] = Field(default=None, description="Document ID")
+ reference: Optional[str] = Field(default=None, description="Document reference")
+ contentSize: Optional[str] = Field(default=None, description="Content size indicator")
registerModelLabels(
@@ -556,6 +565,13 @@ class Observation(BaseModel):
notes: List[str] = Field(
default_factory=list, description="Short notes or key facts"
)
+ # Extended fields for enhanced validation
+ contentValidation: Optional[Dict[str, Any]] = Field(
+ default=None, description="Content validation results"
+ )
+ contentAnalysis: Optional[Dict[str, Any]] = Field(
+ default=None, description="Content analysis results"
+ )
registerModelLabels(
@@ -751,11 +767,21 @@ class TaskStep(BaseModel):
id: str
objective: str
dependencies: Optional[list[str]] = Field(default_factory=list)
- success_criteria: Optional[list[str]] = Field(default_factory=list)
- estimated_complexity: Optional[str] = None
+ successCriteria: Optional[list[str]] = Field(default_factory=list)
+ estimatedComplexity: Optional[str] = None
userMessage: Optional[str] = Field(
None, description="User-friendly message in user's language"
)
+ # Format details extracted from intent analysis
+ dataType: Optional[str] = Field(
+ None, description="Expected data type (text, numbers, documents, etc.)"
+ )
+ expectedFormat: Optional[str] = Field(
+ None, description="Expected output format (json, csv, markdown, etc.)"
+ )
+ qualityRequirements: Optional[Dict[str, Any]] = Field(
+ None, description="Quality requirements and constraints"
+ )
registerModelLabels(
@@ -765,8 +791,8 @@ registerModelLabels(
"id": {"en": "ID", "fr": "ID"},
"objective": {"en": "Objective", "fr": "Objectif"},
"dependencies": {"en": "Dependencies", "fr": "Dépendances"},
- "success_criteria": {"en": "Success Criteria", "fr": "Critères de succès"},
- "estimated_complexity": {
+ "successCriteria": {"en": "Success Criteria", "fr": "Critères de succès"},
+ "estimatedComplexity": {
"en": "Estimated Complexity",
"fr": "Complexité estimée",
},
@@ -825,27 +851,27 @@ registerModelLabels(
class TaskContext(BaseModel):
- task_step: TaskStep
+ taskStep: TaskStep
workflow: Optional["ChatWorkflow"] = None
- workflow_id: Optional[str] = None
- available_documents: Optional[str] = "No documents available"
- available_connections: Optional[list[str]] = Field(default_factory=list)
- previous_results: Optional[list[str]] = Field(default_factory=list)
- previous_handover: Optional[TaskHandover] = None
+ workflowId: Optional[str] = None
+ availableDocuments: Optional[str] = "No documents available"
+ availableConnections: Optional[list[str]] = Field(default_factory=list)
+ previousResults: Optional[list[str]] = Field(default_factory=list)
+ previousHandover: Optional[TaskHandover] = None
improvements: Optional[list[str]] = Field(default_factory=list)
- retry_count: Optional[int] = 0
- previous_action_results: Optional[list] = Field(default_factory=list)
- previous_review_result: Optional[dict] = None
- is_regeneration: Optional[bool] = False
- failure_patterns: Optional[list[str]] = Field(default_factory=list)
- failed_actions: Optional[list] = Field(default_factory=list)
- successful_actions: Optional[list] = Field(default_factory=list)
- criteria_progress: Optional[dict] = None
+ retryCount: Optional[int] = 0
+ previousActionResults: Optional[list] = Field(default_factory=list)
+ previousReviewResult: Optional[dict] = None
+ isRegeneration: Optional[bool] = False
+ failurePatterns: Optional[list[str]] = Field(default_factory=list)
+ failedActions: Optional[list] = Field(default_factory=list)
+ successfulActions: Optional[list] = Field(default_factory=list)
+ criteriaProgress: Optional[dict] = None
def getDocumentReferences(self) -> List[str]:
docs = []
- if self.previous_handover:
- for doc_exchange in self.previous_handover.inputDocuments:
+ if self.previousHandover:
+ for doc_exchange in self.previousHandover.inputDocuments:
docs.extend(doc_exchange.documents)
return list(set(docs))
@@ -857,22 +883,22 @@ class TaskContext(BaseModel):
class ReviewContext(BaseModel):
- task_step: TaskStep
- task_actions: Optional[list] = Field(default_factory=list)
- action_results: Optional[list] = Field(default_factory=list)
- step_result: Optional[dict] = Field(default_factory=dict)
- workflow_id: Optional[str] = None
- previous_results: Optional[list[str]] = Field(default_factory=list)
+ taskStep: TaskStep
+ taskActions: Optional[list] = Field(default_factory=list)
+ actionResults: Optional[list] = Field(default_factory=list)
+ stepResult: Optional[dict] = Field(default_factory=dict)
+ workflowId: Optional[str] = None
+ previousResults: Optional[list[str]] = Field(default_factory=list)
class ReviewResult(BaseModel):
status: str
reason: Optional[str] = None
improvements: Optional[list[str]] = Field(default_factory=list)
- quality_score: Optional[int] = 5
- missing_outputs: Optional[list[str]] = Field(default_factory=list)
- met_criteria: Optional[list[str]] = Field(default_factory=list)
- unmet_criteria: Optional[list[str]] = Field(default_factory=list)
+ qualityScore: Optional[float] = Field(default=5.0, description="Quality score (0-10)")
+ missingOutputs: Optional[list[str]] = Field(default_factory=list)
+ metCriteria: Optional[list[str]] = Field(default_factory=list)
+ unmetCriteria: Optional[list[str]] = Field(default_factory=list)
confidence: Optional[float] = 0.5
userMessage: Optional[str] = Field(
None, description="User-friendly message in user's language"
@@ -886,10 +912,10 @@ registerModelLabels(
"status": {"en": "Status", "fr": "Statut"},
"reason": {"en": "Reason", "fr": "Raison"},
"improvements": {"en": "Improvements", "fr": "Améliorations"},
- "quality_score": {"en": "Quality Score", "fr": "Score de qualité"},
- "missing_outputs": {"en": "Missing Outputs", "fr": "Sorties manquantes"},
- "met_criteria": {"en": "Met Criteria", "fr": "Critères respectés"},
- "unmet_criteria": {"en": "Unmet Criteria", "fr": "Critères non respectés"},
+ "qualityScore": {"en": "Quality Score", "fr": "Score de qualité"},
+ "missingOutputs": {"en": "Missing Outputs", "fr": "Sorties manquantes"},
+ "metCriteria": {"en": "Met Criteria", "fr": "Critères respectés"},
+ "unmetCriteria": {"en": "Unmet Criteria", "fr": "Critères non respectés"},
"confidence": {"en": "Confidence", "fr": "Confiance"},
"userMessage": {"en": "User Message", "fr": "Message utilisateur"},
},
diff --git a/modules/datamodels/datamodelDocument.py b/modules/datamodels/datamodelDocument.py
index 33472130..37797d5b 100644
--- a/modules/datamodels/datamodelDocument.py
+++ b/modules/datamodels/datamodelDocument.py
@@ -7,9 +7,9 @@ class DocumentMetadata(BaseModel):
"""Metadata for the entire document."""
title: str = Field(description="Document title")
author: Optional[str] = Field(default=None, description="Document author")
- created_at: datetime = Field(default_factory=datetime.now, description="Creation timestamp")
- source_documents: List[str] = Field(default_factory=list, description="Source document IDs")
- extraction_method: str = Field(default="ai_extraction", description="Method used for extraction")
+ createdAt: datetime = Field(default_factory=datetime.now, description="Creation timestamp")
+ sourceDocuments: List[str] = Field(default_factory=list, description="Source document IDs")
+ extractionMethod: str = Field(default="ai_extraction", description="Method used for extraction")
version: str = Field(default="1.0", description="Document version")
@@ -31,7 +31,7 @@ class ListItem(BaseModel):
class BulletList(BaseModel):
"""Bulleted or numbered list."""
items: List[ListItem] = Field(description="List items")
- list_type: Literal["bullet", "numbered", "checklist"] = Field(default="bullet", description="List type")
+ listType: Literal["bullet", "numbered", "checklist"] = Field(default="bullet", description="List type")
metadata: Dict[str, Any] = Field(default_factory=dict, description="List metadata")
@@ -59,7 +59,7 @@ class CodeBlock(BaseModel):
class Image(BaseModel):
"""Image with metadata."""
data: str = Field(description="Base64 encoded image data")
- alt_text: Optional[str] = Field(default=None, description="Alternative text")
+ altText: Optional[str] = Field(default=None, description="Alternative text")
caption: Optional[str] = Field(default=None, description="Image caption")
metadata: Dict[str, Any] = Field(default_factory=dict, description="Image metadata")
@@ -68,7 +68,7 @@ class DocumentSection(BaseModel):
"""A section of the document containing one or more content elements."""
id: str = Field(description="Unique section identifier")
title: Optional[str] = Field(default=None, description="Section title")
- content_type: Literal["table", "list", "paragraph", "heading", "code", "image", "mixed"] = Field(description="Primary content type")
+ contentType: Literal["table", "list", "paragraph", "heading", "code", "image", "mixed"] = Field(description="Primary content type")
elements: List[Union[TableData, BulletList, Paragraph, Heading, CodeBlock, Image]] = Field(description="Content elements in this section")
order: int = Field(description="Section order in document")
metadata: Dict[str, Any] = Field(default_factory=dict, description="Section metadata")
@@ -81,9 +81,9 @@ class StructuredDocument(BaseModel):
summary: Optional[str] = Field(default=None, description="Document summary")
tags: List[str] = Field(default_factory=list, description="Document tags")
- def getSectionsByType(self, content_type: str) -> List[DocumentSection]:
+ def getSectionsByType(self, contentType: str) -> List[DocumentSection]:
"""Get all sections of a specific content type."""
- return [section for section in self.sections if section.content_type == content_type]
+ return [section for section in self.sections if section.contentType == contentType]
def getAllTables(self) -> List[TableData]:
"""Get all table data from the document."""
@@ -104,22 +104,6 @@ class StructuredDocument(BaseModel):
return lists
-class JsonChunkResult(BaseModel):
- """Result from processing a single chunk with JSON output."""
- chunk_id: str = Field(description="Chunk identifier")
- document_section: DocumentSection = Field(description="Structured content from this chunk")
- processing_time: float = Field(description="Processing time in seconds")
- metadata: Dict[str, Any] = Field(default_factory=dict, description="Chunk processing metadata")
-
-
-class JsonMergeResult(BaseModel):
- """Result from merging multiple JSON chunks."""
- merged_document: StructuredDocument = Field(description="Merged structured document")
- merge_strategy: str = Field(description="Strategy used for merging")
- chunks_processed: int = Field(description="Number of chunks processed")
- merge_time: float = Field(description="Time taken to merge chunks")
- metadata: Dict[str, Any] = Field(default_factory=dict, description="Merge process metadata")
-
# Update forward references
ListItem.model_rebuild()
diff --git a/modules/workflows/processing/adaptive/contentValidator.py b/modules/workflows/processing/adaptive/contentValidator.py
index 862e36d2..2709af7b 100644
--- a/modules/workflows/processing/adaptive/contentValidator.py
+++ b/modules/workflows/processing/adaptive/contentValidator.py
@@ -22,9 +22,15 @@ class ContentValidator:
self.services = services
self.learningEngine = learningEngine
- async def validateContent(self, documents: List[Any], intent: Dict[str, Any]) -> Dict[str, Any]:
- """Validates delivered content against user intent using AI (single attempt; parse-or-fail)"""
- return await self._validateWithAI(documents, intent)
+ async def validateContent(self, documents: List[Any], intent: Dict[str, Any], taskStep: Optional[Any] = None) -> Dict[str, Any]:
+ """Validates delivered content against user intent using AI (single attempt; parse-or-fail)
+
+ Args:
+ documents: List of documents to validate
+ intent: Workflow-level intent dict (for format requirements)
+ taskStep: Optional TaskStep object (preferred source for objective)
+ """
+ return await self._validateWithAI(documents, intent, taskStep)
def _analyzeDocuments(self, documents: List[Any]) -> List[Dict[str, Any]]:
"""Generic document analysis - create simple summaries with metadata."""
@@ -242,21 +248,58 @@ class ContentValidator:
return False
- async def _validateWithAI(self, documents: List[Any], intent: Dict[str, Any]) -> Dict[str, Any]:
+ async def _validateWithAI(self, documents: List[Any], intent: Dict[str, Any], taskStep: Optional[Any] = None) -> Dict[str, Any]:
"""AI-based comprehensive validation - generic approach"""
try:
if not hasattr(self, 'services') or not self.services or not hasattr(self.services, 'ai'):
return self._createFailedValidationResult("AI service not available")
+ # Use taskStep.objective if available, otherwise fall back to intent.primaryGoal
+ taskObjective = None
+ if taskStep and hasattr(taskStep, 'objective'):
+ taskObjective = taskStep.objective
+ elif taskStep and isinstance(taskStep, dict):
+ taskObjective = taskStep.get('objective')
+
+ # Use taskStep format fields if available, otherwise fall back to intent
+ dataType = None
+ expectedFormat = None
+ if taskStep:
+ if hasattr(taskStep, 'dataType') and taskStep.dataType:
+ dataType = taskStep.dataType
+ elif isinstance(taskStep, dict):
+ dataType = taskStep.get('dataType')
+ if hasattr(taskStep, 'expectedFormat') and taskStep.expectedFormat:
+ expectedFormat = taskStep.expectedFormat
+ elif isinstance(taskStep, dict):
+ expectedFormat = taskStep.get('expectedFormat')
+
+ # Fallback to intent if taskStep format fields not available
+ if not dataType:
+ dataType = intent.get('dataType', 'unknown')
+ if not expectedFormat:
+ expectedFormat = intent.get('expectedFormat', 'unknown')
+
+ # Determine objective text and label
+ objectiveText = taskObjective if taskObjective else intent.get('primaryGoal', 'Unknown')
+ objectiveLabel = "TASK OBJECTIVE" if taskObjective else "USER REQUEST"
+
# Build prompt base WITHOUT document summaries first
- successCriteria = intent.get('successCriteria', [])
+ # Use success criteria from taskStep if available, otherwise from intent
+ successCriteria = []
+ if taskStep and hasattr(taskStep, 'successCriteria') and taskStep.successCriteria:
+ successCriteria = taskStep.successCriteria
+ elif taskStep and isinstance(taskStep, dict):
+ successCriteria = taskStep.get('successCriteria', [])
+ else:
+ successCriteria = intent.get('successCriteria', [])
criteriaCount = len(successCriteria)
promptBase = f"""TASK VALIDATION
-USER REQUEST: '{intent.get('primaryGoal', 'Unknown')}'
-EXPECTED DATA TYPE: {intent.get('dataType', 'unknown')}
-EXPECTED FORMAT: {intent.get('expectedFormat', 'unknown')}
+{objectiveLabel}: '{objectiveText}'
+EXPECTED DATA TYPE: {dataType}
+EXPECTED FORMAT: {expectedFormat}
SUCCESS CRITERIA ({criteriaCount} items): {successCriteria}
VALIDATION RULES:
diff --git a/modules/workflows/processing/adaptive/intentAnalyzer.py b/modules/workflows/processing/adaptive/intentAnalyzer.py
index 2dd99a94..caee37ce 100644
--- a/modules/workflows/processing/adaptive/intentAnalyzer.py
+++ b/modules/workflows/processing/adaptive/intentAnalyzer.py
@@ -27,12 +27,26 @@ class IntentAnalyzer:
return None
# Create AI analysis prompt
+ # Determine if we're in task context (have taskStep) or workflow context
+ isTaskContext = hasattr(context, 'taskStep') and context.taskStep is not None
+ contextObjective = getattr(context.taskStep, 'objective', '') if isTaskContext else ''
+
+ # Use appropriate label based on context
+ if isTaskContext:
+ # Task context: use OBJECTIVE label and only task objective
+ requestLabel = "OBJECTIVE"
+ contextInfo = f"OBJECTIVE: {self.services.utils.sanitizePromptContent(contextObjective, 'userinput')}"
+ else:
+ # Workflow context: use USER REQUEST label
+ requestLabel = "USER REQUEST"
+ contextInfo = f"CONTEXT: {self.services.utils.sanitizePromptContent(contextObjective, 'userinput') if contextObjective else 'None'}"
+
analysisPrompt = f"""
You are an intent analyzer. Analyze the user's request to understand what they want delivered.
-USER REQUEST: {self.services.utils.sanitizePromptContent(userPrompt, 'userinput')}
+{requestLabel}: {self.services.utils.sanitizePromptContent(userPrompt, 'userinput')}
-CONTEXT: {getattr(context.task_step, 'objective', '') if hasattr(context, 'task_step') and context.task_step else ''}
+{contextInfo}
Analyze the user's intent and determine:
1. What type of data/content they want (numbers, text, documents, analysis, code, etc.)
diff --git a/modules/workflows/processing/adaptive/learningEngine.py b/modules/workflows/processing/adaptive/learningEngine.py
index 29cfd53b..7e8fe60b 100644
--- a/modules/workflows/processing/adaptive/learningEngine.py
+++ b/modules/workflows/processing/adaptive/learningEngine.py
@@ -170,9 +170,9 @@ class LearningEngine:
"""Serializes context for storage"""
try:
return {
- "taskObjective": getattr(context, 'task_step', {}).get('objective', '') if hasattr(context, 'task_step') else '',
- "workflowId": getattr(context, 'workflow_id', ''),
- "availableDocuments": getattr(context, 'available_documents', [])
+ "taskObjective": getattr(context, 'taskStep', {}).get('objective', '') if hasattr(context, 'taskStep') else '',
+ "workflowId": getattr(context, 'workflowId', ''),
+ "availableDocuments": getattr(context, 'availableDocuments', [])
}
except Exception:
return {}
diff --git a/modules/workflows/processing/core/messageCreator.py b/modules/workflows/processing/core/messageCreator.py
index bbae5610..53425fc2 100644
--- a/modules/workflows/processing/core/messageCreator.py
+++ b/modules/workflows/processing/core/messageCreator.py
@@ -201,12 +201,12 @@ class MessageCreator:
completionMessage = f"🎯 **Task {taskProgress}**\n\n✅ {reviewResult.reason or 'Task completed successfully'}"
# Add criteria status if available
- if hasattr(reviewResult, 'met_criteria') and reviewResult.met_criteria:
- for criterion in reviewResult.met_criteria:
+ if hasattr(reviewResult, 'metCriteria') and reviewResult.metCriteria:
+ for criterion in reviewResult.metCriteria:
completionMessage += f"\n• {criterion}"
- if hasattr(reviewResult, 'quality_score'):
- completionMessage += f"\n📊 Score {reviewResult.quality_score}/10"
+ if hasattr(reviewResult, 'qualityScore'):
+ completionMessage += f"\n📊 Score {reviewResult.qualityScore}/10"
taskCompletionMessage = {
"workflowId": workflow.id,
diff --git a/modules/workflows/processing/core/taskPlanner.py b/modules/workflows/processing/core/taskPlanner.py
index 9d1091bc..cc66a264 100644
--- a/modules/workflows/processing/core/taskPlanner.py
+++ b/modules/workflows/processing/core/taskPlanner.py
@@ -52,9 +52,14 @@ class TaskPlanner:
self._checkWorkflowStopped(workflow)
# Analyze user intent to obtain cleaned user objective for planning
+ # This intent will be reused for workflow-level validation in executeTask
+ from modules.workflows.processing.adaptive import IntentAnalyzer
intentAnalyzer = IntentAnalyzer(self.services)
- intent = await intentAnalyzer.analyzeUserIntent(actualUserPrompt, None)
- cleanedObjective = intent.get('primaryGoal', actualUserPrompt) if isinstance(intent, dict) else actualUserPrompt
+ workflowIntent = await intentAnalyzer.analyzeUserIntent(actualUserPrompt, None)
+ # Store workflow intent for reuse in executeTask (avoid redundant analysis)
+ if not hasattr(workflow, '_workflowIntent'):
+ workflow._workflowIntent = workflowIntent
+ cleanedObjective = workflowIntent.get('primaryGoal', actualUserPrompt) if isinstance(workflowIntent, dict) else actualUserPrompt
# Create proper context object for task planning using cleaned intent
# For task planning, we need to create a minimal TaskStep since TaskContext requires it
@@ -62,27 +67,27 @@ class TaskPlanner:
id="plan",
objective=cleanedObjective,
dependencies=[],
- success_criteria=[],
- estimated_complexity="medium"
+ successCriteria=[],
+ estimatedComplexity="medium"
)
taskPlanningContext = TaskContext(
- task_step=planningTaskStep,
+ taskStep=planningTaskStep,
workflow=workflow,
- workflow_id=workflow.id,
- available_documents=None,
- available_connections=None,
- previous_results=[],
- previous_handover=None,
+ workflowId=workflow.id,
+ availableDocuments=None,
+ availableConnections=None,
+ previousResults=[],
+ previousHandover=None,
improvements=[],
- retry_count=0,
- previous_action_results=[],
- previous_review_result=None,
- is_regeneration=False,
- failure_patterns=[],
- failed_actions=[],
- successful_actions=[],
- criteria_progress={
+ retryCount=0,
+ previousActionResults=[],
+ previousReviewResult=None,
+ isRegeneration=False,
+ failurePatterns=[],
+ failedActions=[],
+ successfulActions=[],
+ criteriaProgress={
'met_criteria': set(),
'unmet_criteria': set(),
'attempt_history': []
@@ -153,6 +158,16 @@ class TaskPlanner:
if 'description' in taskDict and 'objective' not in taskDict:
taskDict['objective'] = taskDict.pop('description')
+ # Extract format details from workflow intent and populate TaskStep
+ # Use workflow-level intent for format requirements (tasks inherit from workflow)
+ if isinstance(workflowIntent, dict):
+ if 'dataType' in workflowIntent and 'dataType' not in taskDict:
+ taskDict['dataType'] = workflowIntent.get('dataType')
+ if 'expectedFormat' in workflowIntent and 'expectedFormat' not in taskDict:
+ taskDict['expectedFormat'] = workflowIntent.get('expectedFormat')
+ if 'qualityRequirements' in workflowIntent and 'qualityRequirements' not in taskDict:
+ taskDict['qualityRequirements'] = workflowIntent.get('qualityRequirements')
+
try:
task = TaskStep(**taskDict)
tasks.append(task)
@@ -206,7 +221,7 @@ class TaskPlanner:
logger.error(f"Task {i} is not a dictionary: {type(task)}")
return False
- requiredFields = ['id', 'objective', 'success_criteria']
+ requiredFields = ['id', 'objective', 'successCriteria']
missingFields = [field for field in requiredFields if field not in task]
if missingFields:
logger.error(f"Task {i} missing required fields: {missingFields}")
diff --git a/modules/workflows/processing/core/validator.py b/modules/workflows/processing/core/validator.py
index f28727ba..a99c0ec9 100644
--- a/modules/workflows/processing/core/validator.py
+++ b/modules/workflows/processing/core/validator.py
@@ -40,7 +40,7 @@ class WorkflowValidator:
logger.error(f"Task {i} is not a dictionary: {type(task)}")
return False
- requiredFields = ['id', 'objective', 'success_criteria']
+ requiredFields = ['id', 'objective', 'successCriteria']
missingFields = [field for field in requiredFields if field not in task]
if missingFields:
logger.error(f"Task {i} missing required fields: {missingFields}")
diff --git a/modules/workflows/processing/modes/modeActionplan.py b/modules/workflows/processing/modes/modeActionplan.py
index 06a8ae4b..40ce267c 100644
--- a/modules/workflows/processing/modes/modeActionplan.py
+++ b/modules/workflows/processing/modes/modeActionplan.py
@@ -4,6 +4,7 @@
import json
import logging
import uuid
+from datetime import datetime, timezone
from typing import List, Dict, Any
from modules.datamodels.datamodelChat import (
TaskStep, TaskContext, TaskResult, ActionItem, TaskStatus,
@@ -17,6 +18,8 @@ from modules.workflows.processing.shared.promptGenerationActionsActionplan impor
generateActionDefinitionPrompt,
generateResultReviewPrompt
)
+from modules.workflows.processing.adaptive import IntentAnalyzer, ContentValidator, LearningEngine, ProgressTracker
+from modules.workflows.processing.adaptive.adaptiveLearningEngine import AdaptiveLearningEngine
logger = logging.getLogger(__name__)
@@ -25,6 +28,14 @@ class ActionplanMode(BaseMode):
def __init__(self, services, workflow):
super().__init__(services, workflow)
+ # Initialize adaptive components for enhanced validation and learning
+ self.intentAnalyzer = IntentAnalyzer(services)
+ self.learningEngine = LearningEngine()
+ self.adaptiveLearningEngine = AdaptiveLearningEngine()
+ self.contentValidator = ContentValidator(services, self.adaptiveLearningEngine)
+ self.progressTracker = ProgressTracker()
+ self.workflowIntent = None
+ self.taskIntent = None
async def generateActionItems(self, taskStep: TaskStep, workflow: ChatWorkflow,
previousResults: List = None, enhancedContext: TaskContext = None) -> List[ActionItem]:
@@ -33,13 +44,13 @@ class ActionplanMode(BaseMode):
# Check workflow status before generating actions
self._checkWorkflowStopped(workflow)
- retryInfo = f" (Retry #{enhancedContext.retry_count})" if enhancedContext and enhancedContext.retry_count > 0 else ""
+ retryInfo = f" (Retry #{enhancedContext.retryCount})" if enhancedContext and enhancedContext.retryCount > 0 else ""
logger.info(f"Generating actions for task: {taskStep.objective}{retryInfo}")
# Log criteria progress if this is a retry
- if enhancedContext and hasattr(enhancedContext, 'criteria_progress') and enhancedContext.criteria_progress is not None:
- progress = enhancedContext.criteria_progress
- logger.info(f"Retry attempt {enhancedContext.retry_count} - Criteria progress:")
+ if enhancedContext and hasattr(enhancedContext, 'criteriaProgress') and enhancedContext.criteriaProgress is not None:
+ progress = enhancedContext.criteriaProgress
+ logger.info(f"Retry attempt {enhancedContext.retryCount} - Criteria progress:")
if progress.get('met_criteria'):
logger.info(f" Met criteria: {', '.join(progress['met_criteria'])}")
if progress.get('unmet_criteria'):
@@ -59,14 +70,14 @@ class ActionplanMode(BaseMode):
logger.info(f" Quality stable: {currScore}")
# Enhanced retry context logging
- if enhancedContext and enhancedContext.retry_count > 0:
+ if enhancedContext and enhancedContext.retryCount > 0:
logger.info("=== RETRY CONTEXT FOR ACTION GENERATION ===")
- logger.info(f"Retry Count: {enhancedContext.retry_count}")
+ logger.info(f"Retry Count: {enhancedContext.retryCount}")
logger.debug(f"Previous Improvements: {enhancedContext.improvements}")
- logger.debug(f"Previous Review Result: {enhancedContext.previous_review_result}")
- logger.debug(f"Failure Patterns: {enhancedContext.failure_patterns}")
- logger.debug(f"Failed Actions: {enhancedContext.failed_actions}")
- logger.debug(f"Successful Actions: {enhancedContext.successful_actions}")
+ logger.debug(f"Previous Review Result: {enhancedContext.previousReviewResult}")
+ logger.debug(f"Failure Patterns: {enhancedContext.failurePatterns}")
+ logger.debug(f"Failed Actions: {enhancedContext.failedActions}")
+ logger.debug(f"Successful Actions: {enhancedContext.successfulActions}")
logger.info("=== END RETRY CONTEXT ===")
# Log that we're starting action generation
@@ -76,42 +87,42 @@ class ActionplanMode(BaseMode):
if enhancedContext and isinstance(enhancedContext, TaskContext):
# Use existing TaskContext if provided
actionContext = TaskContext(
- task_step=enhancedContext.task_step,
+ taskStep=enhancedContext.taskStep,
workflow=enhancedContext.workflow,
- workflow_id=enhancedContext.workflow_id,
- available_documents=enhancedContext.available_documents,
- available_connections=enhancedContext.available_connections,
- previous_results=enhancedContext.previous_results or previousResults or [],
- previous_handover=enhancedContext.previous_handover,
+ workflowId=enhancedContext.workflowId,
+ availableDocuments=enhancedContext.availableDocuments,
+ availableConnections=enhancedContext.availableConnections,
+ previousResults=enhancedContext.previousResults or previousResults or [],
+ previousHandover=enhancedContext.previousHandover,
improvements=enhancedContext.improvements or [],
- retry_count=enhancedContext.retry_count or 0,
- previous_action_results=enhancedContext.previous_action_results or [],
- previous_review_result=enhancedContext.previous_review_result,
- is_regeneration=enhancedContext.is_regeneration or False,
- failure_patterns=enhancedContext.failure_patterns or [],
- failed_actions=enhancedContext.failed_actions or [],
- successful_actions=enhancedContext.successful_actions or [],
- criteria_progress=enhancedContext.criteria_progress
+ retryCount=enhancedContext.retryCount or 0,
+ previousActionResults=enhancedContext.previousActionResults or [],
+ previousReviewResult=enhancedContext.previousReviewResult,
+ isRegeneration=enhancedContext.isRegeneration or False,
+ failurePatterns=enhancedContext.failurePatterns or [],
+ failedActions=enhancedContext.failedActions or [],
+ successfulActions=enhancedContext.successfulActions or [],
+ criteriaProgress=enhancedContext.criteriaProgress
)
else:
# Create new context from scratch
actionContext = TaskContext(
- task_step=taskStep,
+ taskStep=taskStep,
workflow=workflow,
- workflow_id=workflow.id,
- available_documents=None,
- available_connections=None,
- previous_results=previousResults or [],
- previous_handover=None,
+ workflowId=workflow.id,
+ availableDocuments=None,
+ availableConnections=None,
+ previousResults=previousResults or [],
+ previousHandover=None,
improvements=[],
- retry_count=0,
- previous_action_results=[],
- previous_review_result=None,
- is_regeneration=False,
- failure_patterns=[],
- failed_actions=[],
- successful_actions=[],
- criteria_progress=None
+ retryCount=0,
+ previousActionResults=[],
+ previousReviewResult=None,
+ isRegeneration=False,
+ failurePatterns=[],
+ failedActions=[],
+ successfulActions=[],
+ criteriaProgress=None
)
# Check workflow status before calling AI service
@@ -220,6 +231,27 @@ class ActionplanMode(BaseMode):
"""Execute all actions for a task step using Actionplan mode"""
logger.info(f"=== STARTING TASK {taskIndex or '?'}: {taskStep.objective} ===")
+ # Use workflow-level intent from planning phase (stored in workflow object)
+ # This avoids redundant intent analysis - intent was already analyzed during task planning
+ if hasattr(workflow, '_workflowIntent') and workflow._workflowIntent:
+ self.workflowIntent = workflow._workflowIntent
+ logger.info(f"Using workflow intent from planning phase")
+ else:
+ # Fallback: analyze if not available (shouldn't happen in normal flow)
+ originalPrompt = self.services.currentUserPrompt if self.services and hasattr(self.services, 'currentUserPrompt') else taskStep.objective
+ self.workflowIntent = await self.intentAnalyzer.analyzeUserIntent(originalPrompt, context)
+ logger.warning(f"Workflow intent not found in workflow object, analyzed fresh")
+
+ # Task-level intent is NOT needed - use task.objective + task format fields (dataType, expectedFormat, qualityRequirements)
+ # These format fields are populated from workflow intent during task planning
+ self.taskIntent = None # Removed redundant task-level intent analysis
+ logger.info(f"Workflow intent: {self.workflowIntent}")
+ if taskStep.dataType or taskStep.expectedFormat or taskStep.qualityRequirements:
+ logger.info(f"Task format info: dataType={taskStep.dataType}, expectedFormat={taskStep.expectedFormat}")
+
+ # Reset progress tracking for new task
+ self.progressTracker.reset()
+
# Update workflow object before executing task
if taskIndex is not None:
self._updateWorkflowBeforeExecutingTask(taskIndex)
@@ -243,10 +275,10 @@ class ActionplanMode(BaseMode):
# Update retry context with current attempt information
if retryContext:
- retryContext.retry_count = attempt + 1
+ retryContext.retryCount = attempt + 1
actions = await self.generateActionItems(taskStep, workflow,
- previousResults=retryContext.previous_results,
+ previousResults=retryContext.previousResults,
enhancedContext=retryContext)
# Log total actions count for this task
@@ -302,6 +334,36 @@ class ActionplanMode(BaseMode):
taskIndex, actionNumber, totalActions)
actionResults.append(result)
+ # Enhanced validation: Content validation after each action (like React mode)
+ if getattr(self, 'workflowIntent', None) and result.documents:
+ # Pass ALL documents to validator - validator decides what to validate (generic approach)
+ # Pass taskStep so validator can use task.objective and format fields
+ validationResult = await self.contentValidator.validateContent(result.documents, self.workflowIntent, taskStep)
+ qualityScore = validationResult.get('qualityScore', 0.0)
+ if qualityScore is None:
+ qualityScore = 0.0
+ logger.info(f"Content validation for action {actionNumber}: {validationResult['overallSuccess']} (quality: {qualityScore:.2f})")
+
+ # Record validation result for adaptive learning
+ actionContext = {
+ 'actionName': f"{action.execMethod}.{action.execAction}",
+ 'workflowId': context.workflowId
+ }
+
+ self.adaptiveLearningEngine.recordValidationResult(
+ validationResult,
+ actionContext,
+ context.workflowId,
+ actionNumber
+ )
+
+ # Learn from feedback
+ feedback = self._collectFeedback(result, validationResult, self.workflowIntent)
+ self.learningEngine.learnFromFeedback(feedback, context, self.workflowIntent)
+
+ # Update progress
+ self.progressTracker.updateOperation(result, validationResult, self.workflowIntent)
+
if result.success:
state.addSuccessfulAction(result)
else:
@@ -333,47 +395,47 @@ class ActionplanMode(BaseMode):
logger.warning(f"Task step '{taskStep.objective}' requires retry: {reviewResult.improvements}")
# Enhanced logging of criteria status
- if reviewResult.met_criteria:
- logger.info(f"Met criteria: {', '.join(reviewResult.met_criteria)}")
- if reviewResult.unmet_criteria:
- logger.warning(f"Unmet criteria: {', '.join(reviewResult.unmet_criteria)}")
+ if reviewResult.metCriteria:
+ logger.info(f"Met criteria: {', '.join(reviewResult.metCriteria)}")
+ if reviewResult.unmetCriteria:
+ logger.warning(f"Unmet criteria: {', '.join(reviewResult.unmetCriteria)}")
state.incrementRetryCount()
# Update retry context with retry information and criteria tracking
if retryContext:
- retryContext.retry_count = state.retry_count
+ retryContext.retryCount = state.retry_count
retryContext.improvements = reviewResult.improvements
- retryContext.previous_action_results = actionResults
- retryContext.previous_review_result = reviewResult
- retryContext.is_regeneration = True
- retryContext.failure_patterns = state.getFailurePatterns()
- retryContext.failed_actions = state.failed_actions
- retryContext.successful_actions = state.successful_actions
+ retryContext.previousActionResults = actionResults
+ retryContext.previousReviewResult = reviewResult
+ retryContext.isRegeneration = True
+ retryContext.failurePatterns = state.getFailurePatterns()
+ retryContext.failedActions = state.failed_actions
+ retryContext.successfulActions = state.successful_actions
# Track criteria progress across retries
- if not hasattr(retryContext, 'criteria_progress'):
- retryContext.criteria_progress = {
+ if not hasattr(retryContext, 'criteriaProgress'):
+ retryContext.criteriaProgress = {
'met_criteria': set(),
'unmet_criteria': set(),
'attempt_history': []
}
# Update criteria progress
- if reviewResult.met_criteria:
- retryContext.criteria_progress['met_criteria'].update(reviewResult.met_criteria)
- if reviewResult.unmet_criteria:
- retryContext.criteria_progress['unmet_criteria'].update(reviewResult.unmet_criteria)
+ if reviewResult.metCriteria:
+ retryContext.criteriaProgress['met_criteria'].update(reviewResult.metCriteria)
+ if reviewResult.unmetCriteria:
+ retryContext.criteriaProgress['unmet_criteria'].update(reviewResult.unmetCriteria)
# Record this attempt's criteria status
attemptRecord = {
'attempt': state.retry_count,
- 'met_criteria': reviewResult.met_criteria or [],
- 'unmet_criteria': reviewResult.unmet_criteria or [],
- 'quality_score': reviewResult.quality_score,
+ 'met_criteria': reviewResult.metCriteria or [],
+ 'unmet_criteria': reviewResult.unmetCriteria or [],
+ 'quality_score': reviewResult.qualityScore,
'improvements': reviewResult.improvements or []
}
- retryContext.criteria_progress['attempt_history'].append(attemptRecord)
+ retryContext.criteriaProgress['attempt_history'].append(attemptRecord)
# Create retry message
await self.messageCreator.createRetryMessage(taskStep, workflow, taskIndex, reviewResult)
@@ -420,10 +482,10 @@ class ActionplanMode(BaseMode):
# Create proper context object for result review
reviewContext = ReviewContext(
- task_step=taskStep,
- task_actions=taskActions,
- action_results=actionResults,
- step_result={
+ taskStep=taskStep,
+ taskActions=taskActions,
+ actionResults=actionResults,
+ stepResult={
'successful_actions': sum(1 for result in actionResults if result.success),
'total_actions': len(actionResults),
'results': [self._extractResultText(result) for result in actionResults if result.success],
@@ -437,8 +499,8 @@ class ActionplanMode(BaseMode):
for i, result in enumerate(actionResults)
]
},
- workflow_id=workflow.id,
- previous_results=[]
+ workflowId=workflow.id,
+ previousResults=[]
)
# Check workflow status before calling AI service
@@ -452,8 +514,8 @@ class ActionplanMode(BaseMode):
# Log result review prompt sent to AI
logger.info("=== RESULT REVIEW PROMPT SENT TO AI ===")
logger.info(f"Task: {taskStep.objective}")
- logger.info(f"Action Results Count: {len(reviewContext.action_results) if reviewContext.action_results else 0}")
- logger.info(f"Task Actions Count: {len(reviewContext.task_actions) if reviewContext.task_actions else 0}")
+ logger.info(f"Action Results Count: {len(reviewContext.actionResults) if reviewContext.actionResults else 0}")
+ logger.info(f"Task Actions Count: {len(reviewContext.taskActions) if reviewContext.taskActions else 0}")
# Centralized AI call: Result validation (balanced analysis) with placeholders
options = AiCallOptions(
@@ -488,7 +550,7 @@ class ActionplanMode(BaseMode):
raise ValueError("Review response missing 'status' field")
review.setdefault('status', 'unknown')
review.setdefault('reason', 'No reason provided')
- review.setdefault('quality_score', 5)
+ review.setdefault('quality_score', 5.0)
# Ensure improvements is a list
improvements = review.get('improvements', [])
@@ -511,31 +573,31 @@ class ActionplanMode(BaseMode):
status=review.get('status', 'unknown'),
reason=review.get('reason', 'No reason provided'),
improvements=improvements,
- quality_score=review.get('quality_score', 5),
- missing_outputs=[],
- met_criteria=metCriteria,
- unmet_criteria=unmetCriteria,
+ qualityScore=float(review.get('quality_score', review.get('qualityScore', 5.0))),
+ missingOutputs=[],
+ metCriteria=metCriteria,
+ unmetCriteria=unmetCriteria,
confidence=review.get('confidence', 0.5),
# Extract user-friendly message if available
userMessage=review.get('userMessage', None)
)
# Enhanced validation logging
- logger.info(f"VALIDATION RESULT - Task: '{taskStep.objective}' - Status: {reviewResult.status.upper()}, Quality: {reviewResult.quality_score}/10")
+ logger.info(f"VALIDATION RESULT - Task: '{taskStep.objective}' - Status: {reviewResult.status.upper()}, Quality: {reviewResult.qualityScore}/10")
if reviewResult.status == 'success':
logger.info(f"VALIDATION SUCCESS - Task completed successfully")
- if reviewResult.met_criteria:
- logger.info(f"Met criteria: {', '.join(reviewResult.met_criteria)}")
+ if reviewResult.metCriteria:
+ logger.info(f"Met criteria: {', '.join(reviewResult.metCriteria)}")
elif reviewResult.status == 'retry':
logger.warning(f"VALIDATION RETRY - Task requires retry: {reviewResult.improvements}")
- if reviewResult.unmet_criteria:
- logger.warning(f"Unmet criteria: {', '.join(reviewResult.unmet_criteria)}")
+ if reviewResult.unmetCriteria:
+ logger.warning(f"Unmet criteria: {', '.join(reviewResult.unmetCriteria)}")
else:
logger.error(f"VALIDATION FAILED - Task failed: {reviewResult.reason}")
logger.info(f"=== TASK COMPLETION REVIEW FINISHED ===")
logger.info(f"Final Status: {reviewResult.status}")
- logger.info(f"Quality Score: {reviewResult.quality_score}/10")
+ logger.info(f"Quality Score: {reviewResult.qualityScore}/10")
logger.info(f"Improvements: {reviewResult.improvements}")
logger.info("=== END REVIEW ===")
@@ -545,7 +607,7 @@ class ActionplanMode(BaseMode):
return ReviewResult(
status='failed',
reason=str(e),
- quality_score=0
+ qualityScore=0.0
)
def _createActionItem(self, actionData: Dict[str, Any]) -> ActionItem:
@@ -613,6 +675,47 @@ class ActionplanMode(BaseMode):
# Join all document results with separators
return "\n\n---\n\n".join(resultParts) if resultParts else ""
+ def _collectFeedback(self, result: Any, validation: Dict[str, Any], intent: Dict[str, Any]) -> Dict[str, Any]:
+ """Collects comprehensive feedback from action execution"""
+ try:
+ # Extract content summary
+ contentDelivered = ""
+ if result.documents:
+ firstDoc = result.documents[0]
+ if hasattr(firstDoc, 'documentData'):
+ data = firstDoc.documentData
+ if isinstance(data, dict) and 'content' in data:
+ content = str(data['content'])
+ contentDelivered = content[:100] + "..." if len(content) > 100 else content
+ else:
+ contentDelivered = str(data)[:100] + "..." if len(str(data)) > 100 else str(data)
+
+ return {
+ "actionAttempted": result.resultLabel or "unknown",
+ "parametersUsed": {}, # Would be extracted from action context
+ "contentDelivered": contentDelivered,
+ "intentMatchScore": validation.get('qualityScore', 0),
+ "qualityScore": validation.get('qualityScore', 0),
+ "issuesFound": validation.get('improvementSuggestions', []),
+ "learningOpportunities": validation.get('improvementSuggestions', []),
+ "userSatisfaction": None, # Would be collected from user feedback
+ "timestamp": datetime.now(timezone.utc).timestamp()
+ }
+
+ except Exception as e:
+ logger.error(f"Error collecting feedback: {str(e)}")
+ return {
+ "actionAttempted": "unknown",
+ "parametersUsed": {},
+ "contentDelivered": "",
+ "intentMatchScore": 0,
+ "qualityScore": 0,
+ "issuesFound": [],
+ "learningOpportunities": [],
+ "userSatisfaction": None,
+ "timestamp": datetime.now(timezone.utc).timestamp()
+ }
+
def _updateWorkflowBeforeExecutingTask(self, taskNumber: int):
"""Update workflow object before executing a task"""
try:
@@ -689,57 +792,4 @@ class ActionplanMode(BaseMode):
logger.debug(f"Updated workflow totals: Tasks {self.workflow.totalTasks if hasattr(self.workflow, 'totalTasks') else 'N/A'}, Actions {self.workflow.totalActions if hasattr(self.workflow, 'totalActions') else 'N/A'}")
except Exception as e:
logger.error(f"Error setting workflow totals: {str(e)}")
-
- def _createActionItem(self, actionData: Dict[str, Any]) -> ActionItem:
- """Creates a new task action"""
- try:
- import uuid
-
- # Ensure ID is present
- if "id" not in actionData or not actionData["id"]:
- actionData["id"] = f"action_{uuid.uuid4()}"
-
- # Ensure required fields
- if "status" not in actionData:
- actionData["status"] = TaskStatus.PENDING
-
- if "execMethod" not in actionData:
- logger.error("execMethod is required for task action")
- return None
-
- if "execAction" not in actionData:
- logger.error("execAction is required for task action")
- return None
-
- if "execParameters" not in actionData:
- actionData["execParameters"] = {}
-
- # Use generic field separation based on ActionItem model
- simpleFields, objectFields = self.services.interfaceDbChat._separateObjectFields(ActionItem, actionData)
-
- # Create action in database
- createdAction = self.services.interfaceDbChat.db.recordCreate(ActionItem, simpleFields)
-
- # Convert to ActionItem model
- return ActionItem(
- id=createdAction["id"],
- execMethod=createdAction["execMethod"],
- execAction=createdAction["execAction"],
- execParameters=createdAction.get("execParameters", {}),
- execResultLabel=createdAction.get("execResultLabel"),
- expectedDocumentFormats=createdAction.get("expectedDocumentFormats"),
- status=createdAction.get("status", TaskStatus.PENDING),
- error=createdAction.get("error"),
- retryCount=createdAction.get("retryCount", 0),
- retryMax=createdAction.get("retryMax", 3),
- processingTime=createdAction.get("processingTime"),
- timestamp=float(createdAction.get("timestamp", self.services.utils.timestampGetUtc())),
- result=createdAction.get("result"),
- resultDocuments=createdAction.get("resultDocuments", []),
- userMessage=createdAction.get("userMessage")
- )
-
- except Exception as e:
- logger.error(f"Error creating task action: {str(e)}")
- return None
diff --git a/modules/workflows/processing/modes/modeReact.py b/modules/workflows/processing/modes/modeReact.py
index 3b8d9910..32bd5d62 100644
--- a/modules/workflows/processing/modes/modeReact.py
+++ b/modules/workflows/processing/modes/modeReact.py
@@ -9,7 +9,7 @@ from datetime import datetime, timezone
from typing import List, Dict, Any
from modules.datamodels.datamodelChat import (
TaskStep, TaskContext, TaskResult, ActionItem, TaskStatus,
- ActionResult
+ ActionResult, Observation, ObservationPreview, ReviewResult
)
from modules.datamodels.datamodelChat import ChatWorkflow
from modules.workflows.processing.modes.modeBase import BaseMode
@@ -50,14 +50,23 @@ class ReactMode(BaseMode):
"""Execute task using React mode - iterative plan-act-observe-refine loop"""
logger.info(f"=== STARTING TASK {taskIndex or '?'}: {taskStep.objective} ===")
- # NEW: Analyze intents separately for proper validation vs task completion
- # Workflow-level intent from cleaned original user prompt
- original_prompt = self.services.currentUserPrompt if self.services and hasattr(self.services, 'currentUserPrompt') else taskStep.objective
- self.workflowIntent = await self.intentAnalyzer.analyzeUserIntent(original_prompt, context)
- # Task-level intent from current task objective (used only for task-scoped checks)
- self.taskIntent = await self.intentAnalyzer.analyzeUserIntent(taskStep.objective, context)
- logger.info(f"Intent analysis — workflow: {self.workflowIntent}")
- logger.info(f"Intent analysis — task: {self.taskIntent}")
+ # Use workflow-level intent from planning phase (stored in workflow object)
+ # This avoids redundant intent analysis - intent was already analyzed during task planning
+ if hasattr(workflow, '_workflowIntent') and workflow._workflowIntent:
+ self.workflowIntent = workflow._workflowIntent
+ logger.info(f"Using workflow intent from planning phase")
+ else:
+ # Fallback: analyze if not available (shouldn't happen in normal flow)
+ original_prompt = self.services.currentUserPrompt if self.services and hasattr(self.services, 'currentUserPrompt') else taskStep.objective
+ self.workflowIntent = await self.intentAnalyzer.analyzeUserIntent(original_prompt, context)
+ logger.warning(f"Workflow intent not found in workflow object, analyzed fresh")
+
+ # Task-level intent is NOT needed - use task.objective + task format fields (dataType, expectedFormat, qualityRequirements)
+ # These format fields are populated from workflow intent during task planning
+ self.taskIntent = None # Removed redundant task-level intent analysis
+ logger.info(f"Workflow intent: {self.workflowIntent}")
+ if taskStep.dataType or taskStep.expectedFormat or taskStep.qualityRequirements:
+ logger.info(f"Task format info: dataType={taskStep.dataType}, expectedFormat={taskStep.expectedFormat}")
# NEW: Reset progress tracking for new task
self.progressTracker.reset()
@@ -75,7 +84,7 @@ class ReactMode(BaseMode):
logger.info(f"Using React mode execution with max_steps: {state.max_steps}")
step = 1
- lastReviewDict = None
+ decision = None
while step <= state.max_steps:
self._checkWorkflowStopped(workflow)
@@ -93,14 +102,14 @@ class ReactMode(BaseMode):
result = await self._actExecute(context, selection, taskStep, workflow, step)
observation = self._observeBuild(result)
- # Attach deterministic label for clarity
- observation['resultLabel'] = result.resultLabel
+ # Note: resultLabel is already set correctly in _observeBuild from actionResult.resultLabel
# Content validation (against original cleaned user prompt / workflow intent)
if getattr(self, 'workflowIntent', None) and result.documents:
# Pass ALL documents to validator - validator decides what to validate (generic approach)
- validationResult = await self.contentValidator.validateContent(result.documents, self.workflowIntent)
- observation['contentValidation'] = validationResult
+ # Pass taskStep so validator can use task.objective and format fields
+ validationResult = await self.contentValidator.validateContent(result.documents, self.workflowIntent, taskStep)
+ observation.contentValidation = validationResult
quality_score = validationResult.get('qualityScore', 0.0)
if quality_score is None:
quality_score = 0.0
@@ -110,13 +119,13 @@ class ReactMode(BaseMode):
actionValue = selection.get('action', 'unknown')
actionContext = {
'actionName': actionValue,
- 'workflowId': context.workflow_id
+ 'workflowId': context.workflowId
}
self.adaptiveLearningEngine.recordValidationResult(
validationResult,
actionContext,
- context.workflow_id,
+ context.workflowId,
step
)
@@ -130,18 +139,16 @@ class ReactMode(BaseMode):
decision = await self._refineDecide(context, observation)
# Store refinement decision in context for next iteration
- if not hasattr(context, 'previous_review_result') or context.previous_review_result is None:
- context.previous_review_result = []
+ if not hasattr(context, 'previousReviewResult') or context.previousReviewResult is None:
+ context.previousReviewResult = []
if decision: # Only append if decision is not None
- context.previous_review_result.append(decision)
+ context.previousReviewResult.append(decision)
# Update context with learnings from this step
- if decision and isinstance(decision, dict) and decision.get('reason'):
+ if decision and decision.reason:
if not hasattr(context, 'improvements'):
context.improvements = []
- context.improvements.append(f"Step {step}: {decision.get('reason')}")
-
- lastReviewDict = decision if isinstance(decision, dict) else {}
+ context.improvements.append(f"Step {step}: {decision.reason}")
# Create user-friendly message AFTER action execution
# Action completion message is now handled by the standard message creator in _actExecute
@@ -152,8 +159,9 @@ class ReactMode(BaseMode):
# NEW: Use adaptive stopping logic
progressState = self.progressTracker.getCurrentProgress()
- continueByProgress = self.progressTracker.shouldContinue(progressState, observation.get('contentValidation', {}))
- continueByReview = shouldContinue(observation, lastReviewDict, step, state.max_steps)
+ continueByProgress = self.progressTracker.shouldContinue(progressState, observation.contentValidation if observation.contentValidation else {})
+ # Use Observation Pydantic model directly (decision is ReviewResult model)
+ continueByReview = shouldContinue(observation, decision, step, state.max_steps)
if not continueByProgress or not continueByReview:
logger.info(f"Stopping at step {step}: progress={continueByProgress}, review={continueByReview}")
@@ -163,13 +171,23 @@ class ReactMode(BaseMode):
# Summarize task result for react mode
status = TaskStatus.COMPLETED
success = True
- feedback = lastReviewDict.get('reason') if lastReviewDict and isinstance(lastReviewDict, dict) else 'Completed'
- if lastReviewDict and isinstance(lastReviewDict, dict) and lastReviewDict.get('decision') == 'stop':
+ # Get feedback from last decision if available
+ lastDecision = context.previousReviewResult[-1] if hasattr(context, 'previousReviewResult') and context.previousReviewResult else None
+ feedback = lastDecision.reason if lastDecision and isinstance(lastDecision, ReviewResult) else 'Completed'
+ if lastDecision and isinstance(lastDecision, ReviewResult) and lastDecision.status == 'success':
success = True
+ # Create proper ReviewResult for completion message
+ completionReviewResult = ReviewResult(
+ status='success',
+ reason=feedback,
+ qualityScore=lastDecision.qualityScore if lastDecision and isinstance(lastDecision, ReviewResult) else 8.0,
+ metCriteria=[],
+ improvements=[]
+ )
+
# Create task completion message
- await self.messageCreator.createTaskCompletionMessage(taskStep, workflow, taskIndex, totalTasks,
- type('ReviewResult', (), {'reason': feedback, 'met_criteria': [], 'quality_score': 8})())
+ await self.messageCreator.createTaskCompletionMessage(taskStep, workflow, taskIndex, totalTasks, completionReviewResult)
return TaskResult(
taskId=taskStep.id,
@@ -186,6 +204,17 @@ class ReactMode(BaseMode):
placeholders = bundle.placeholders
# Centralized AI call for plan selection (uses static planning parameters)
+ from modules.datamodels.datamodelAi import AiCallOptions, OperationTypeEnum, PriorityEnum, ProcessingModeEnum
+ # Create options for documentation/consistency (currently not passed to callAiPlanning API)
+ options = AiCallOptions(
+ operationType=OperationTypeEnum.PLAN,
+ priority=PriorityEnum.QUALITY,
+ compressPrompt=False,
+ compressContext=False,
+ processingMode=ProcessingModeEnum.DETAILED,
+ maxCost=0.10,
+ maxProcessingTime=30
+ )
response = await self.services.ai.callAiPlanning(
prompt=promptTemplate,
placeholders=placeholders
@@ -264,9 +293,9 @@ class ReactMode(BaseMode):
from types import SimpleNamespace
stage2Context = SimpleNamespace()
- # Copy essential fields from original context for fallbacks (snake_case for placeholderFactory compatibility)
- stage2Context.task_step = getattr(context, 'task_step', None)
- stage2Context.workflow_id = getattr(context, 'workflow_id', None)
+ # Copy essential fields from original context for fallbacks
+ stage2Context.taskStep = getattr(context, 'taskStep', None)
+ stage2Context.workflowId = getattr(context, 'workflowId', None)
# Set Stage 1 data directly on the permissive context (snake_case for promptGenerationActionsReact compatibility)
if isinstance(selection, dict):
@@ -284,6 +313,17 @@ class ReactMode(BaseMode):
placeholders = bundle.placeholders
# Centralized AI call for parameter suggestion (uses static planning parameters)
+ from modules.datamodels.datamodelAi import AiCallOptions, OperationTypeEnum, PriorityEnum, ProcessingModeEnum
+ # Create options for documentation/consistency (currently not passed to callAiPlanning API)
+ options = AiCallOptions(
+ operationType=OperationTypeEnum.PLAN,
+ priority=PriorityEnum.QUALITY,
+ compressPrompt=False,
+ compressContext=False,
+ processingMode=ProcessingModeEnum.DETAILED,
+ maxCost=0.10,
+ maxProcessingTime=30
+ )
paramsResp = await self.services.ai.callAiPlanning(
prompt=promptTemplate,
placeholders=placeholders
@@ -351,7 +391,7 @@ class ReactMode(BaseMode):
return result
- def _observeBuild(self, actionResult: ActionResult) -> Dict[str, Any]:
+ def _observeBuild(self, actionResult: ActionResult) -> Observation:
"""Observe: build compact observation object from ActionResult with full document metadata"""
previews = []
notes = []
@@ -359,27 +399,38 @@ class ReactMode(BaseMode):
# Process all documents and show full metadata
for doc in actionResult.documents:
# Extract all available metadata without content
- docMetadata = {
- "name": getattr(doc, 'fileName', None) or getattr(doc, 'documentName', 'Unknown'),
- "mimeType": getattr(doc, 'mimeType', 'Unknown'),
- "size": getattr(doc, 'size', 'Unknown'),
- "created": getattr(doc, 'created', 'Unknown'),
- "modified": getattr(doc, 'modified', 'Unknown'),
- "typeGroup": getattr(doc, 'typeGroup', 'Unknown'),
- "documentId": getattr(doc, 'documentId', 'Unknown'),
- "reference": getattr(doc, 'reference', 'Unknown')
- }
- # Remove 'Unknown' values to keep it clean
- docMetadata = {k: v for k, v in docMetadata.items() if v != 'Unknown'}
+ name = getattr(doc, 'fileName', None) or getattr(doc, 'documentName', 'Unknown')
+ mimeType = getattr(doc, 'mimeType', None)
+ size = getattr(doc, 'size', None)
+ created = getattr(doc, 'created', None)
+ modified = getattr(doc, 'modified', None)
+ typeGroup = getattr(doc, 'typeGroup', None)
+ documentId = getattr(doc, 'documentId', None)
+ reference = getattr(doc, 'reference', None)
# Add content size indicator instead of actual content
+ contentSize = None
if hasattr(doc, 'documentData') and doc.documentData:
if isinstance(doc.documentData, dict) and 'content' in doc.documentData:
contentLength = len(str(doc.documentData['content']))
- docMetadata['contentSize'] = f"{contentLength} characters"
+ contentSize = f"{contentLength} characters"
else:
contentLength = len(str(doc.documentData))
- docMetadata['contentSize'] = f"{contentLength} characters"
+ contentSize = f"{contentLength} characters"
+
+ # Create ObservationPreview with only non-None values
+ preview = ObservationPreview(
+ name=name if name != 'Unknown' else 'Unknown Document',
+ mimeType=mimeType if mimeType and mimeType != 'Unknown' else None,
+ size=str(size) if size and size != 'Unknown' else None,
+ created=str(created) if created and created != 'Unknown' else None,
+ modified=str(modified) if modified and modified != 'Unknown' else None,
+ typeGroup=str(typeGroup) if typeGroup and typeGroup != 'Unknown' else None,
+ documentId=str(documentId) if documentId and documentId != 'Unknown' else None,
+ reference=str(reference) if reference and reference != 'Unknown' else None,
+ contentSize=contentSize
+ )
+ previews.append(preview)
# Extract comment if available
if hasattr(doc, 'documentData') and doc.documentData:
@@ -387,22 +438,21 @@ class ReactMode(BaseMode):
if isinstance(data, dict):
comment = data.get("comment", "")
if comment:
- notes.append(f"Document '{docMetadata.get('name', 'Unknown')}': {comment}")
-
- previews.append(docMetadata)
+ notes.append(f"Document '{name}': {comment}")
- observation = {
- "success": bool(actionResult.success),
- "resultLabel": actionResult.resultLabel or "",
- "documentsCount": len(actionResult.documents) if actionResult.documents else 0,
- "previews": previews,
- "notes": notes
- }
-
- # NEW: Add content analysis if intent is available
- if self.currentIntent and actionResult.documents:
+ # Build observation with optional content analysis
+ contentAnalysis = None
+ if self.currentIntent and actionResult and actionResult.documents:
contentAnalysis = self._analyzeContent(actionResult.documents)
- observation['contentAnalysis'] = contentAnalysis
+
+ observation = Observation(
+ success=bool(actionResult.success) if actionResult else False,
+ resultLabel=actionResult.resultLabel or "" if actionResult else "",
+ documentsCount=len(actionResult.documents) if actionResult and actionResult.documents else 0,
+ previews=previews,
+ notes=notes,
+ contentAnalysis=contentAnalysis
+ )
return observation
@@ -529,17 +579,27 @@ class ReactMode(BaseMode):
"timestamp": datetime.now(timezone.utc).timestamp()
}
- async def _refineDecide(self, context: TaskContext, observation: Dict[str, Any]) -> Dict[str, Any]:
+ async def _refineDecide(self, context: TaskContext, observation: Observation) -> ReviewResult:
"""Refine: decide continue or stop, with reason"""
# Create proper ReviewContext for extractReviewContent
from modules.datamodels.datamodelChat import ReviewContext
+ # Convert observation to dict for extractReviewContent (temporary compatibility)
+ observationDict = {
+ 'success': observation.success,
+ 'resultLabel': observation.resultLabel,
+ 'documentsCount': observation.documentsCount,
+ 'previews': [p.model_dump(exclude_none=True) if hasattr(p, 'model_dump') else p.dict() for p in observation.previews] if observation.previews else [],
+ 'notes': observation.notes,
+ 'contentValidation': observation.contentValidation if observation.contentValidation else {},
+ 'contentAnalysis': observation.contentAnalysis if observation.contentAnalysis else {}
+ }
reviewContext = ReviewContext(
- task_step=context.task_step,
- task_actions=[],
- action_results=[], # React mode doesn't have action results in this context
- step_result={'observation': observation},
- workflow_id=context.workflow_id,
- previous_results=[]
+ taskStep=context.taskStep,
+ taskActions=[],
+ actionResults=[], # React mode doesn't have action results in this context
+ stepResult={'observation': observationDict},
+ workflowId=context.workflowId,
+ previousResults=[]
)
baseReviewContent = extractReviewContent(reviewContext)
@@ -547,24 +607,24 @@ class ReactMode(BaseMode):
# NEW: Add content validation to review content
enhancedReviewContent = placeholders.get("REVIEW_CONTENT", "")
- if 'contentValidation' in observation:
- validation = observation['contentValidation']
+ if observation.contentValidation:
+ validation = observation.contentValidation
enhancedReviewContent += f"\n\nCONTENT VALIDATION:\n"
- enhancedReviewContent += f"Overall Success: {validation['overallSuccess']}\n"
+ enhancedReviewContent += f"Overall Success: {validation.get('overallSuccess', False)}\n"
quality_score = validation.get('qualityScore', 0.0)
if quality_score is None:
quality_score = 0.0
enhancedReviewContent += f"Quality Score: {quality_score:.2f}\n"
- if validation['improvementSuggestions']:
+ if validation.get('improvementSuggestions'):
enhancedReviewContent += f"Improvement Suggestions: {', '.join(validation['improvementSuggestions'])}\n"
# NEW: Add content analysis to review content
- if 'contentAnalysis' in observation:
- analysis = observation['contentAnalysis']
+ if observation.contentAnalysis:
+ analysis = observation.contentAnalysis
enhancedReviewContent += f"\nCONTENT ANALYSIS:\n"
- enhancedReviewContent += f"Content Type: {analysis['contentType']}\n"
- enhancedReviewContent += f"Intent Match: {analysis['intentMatch']}\n"
- if analysis['contentSnippet']:
+ enhancedReviewContent += f"Content Type: {analysis.get('contentType', 'unknown')}\n"
+ enhancedReviewContent += f"Intent Match: {analysis.get('intentMatch', False)}\n"
+ if analysis.get('contentSnippet'):
enhancedReviewContent += f"Content Preview: {analysis['contentSnippet']}\n"
# NEW: Add progress state to review content
@@ -585,6 +645,17 @@ class ReactMode(BaseMode):
placeholders = bundle.placeholders
# Centralized AI call for refinement decision (uses static planning parameters)
+ from modules.datamodels.datamodelAi import AiCallOptions, OperationTypeEnum, PriorityEnum, ProcessingModeEnum
+ # Create options for documentation/consistency (currently not passed to callAiPlanning API)
+ options = AiCallOptions(
+ operationType=OperationTypeEnum.DATA_ANALYSE,
+ priority=PriorityEnum.BALANCED,
+ compressPrompt=True,
+ compressContext=False,
+ processingMode=ProcessingModeEnum.ADVANCED,
+ maxCost=0.05,
+ maxProcessingTime=30
+ )
resp = await self.services.ai.callAiPlanning(
prompt=promptTemplate,
placeholders=placeholders
@@ -592,7 +663,11 @@ class ReactMode(BaseMode):
# More robust JSON extraction
if not resp:
- decision = {"decision": "continue", "reason": "default"}
+ return ReviewResult(
+ status="continue",
+ reason="default",
+ qualityScore=5.0
+ )
else:
# Find JSON boundaries more safely
start_idx = resp.find('{')
@@ -607,16 +682,34 @@ class ReactMode(BaseMode):
decision = json.loads(js)
# Ensure decision is a dictionary
if not isinstance(decision, dict):
- decision = {"decision": "continue", "reason": "default"}
+ return ReviewResult(
+ status="continue",
+ reason="default",
+ qualityScore=5.0
+ )
+
+ # Convert decision dict to ReviewResult model
+ decisionValue = decision.get('decision', 'continue')
+ # Map "stop" to "success" for ReviewResult status
+ status = 'success' if decisionValue == 'stop' else 'continue'
+ return ReviewResult(
+ status=status,
+ reason=decision.get('reason', 'No reason provided'),
+ qualityScore=float(decision.get('quality_score', decision.get('qualityScore', 5.0))),
+ confidence=float(decision.get('confidence', 0.5)),
+ userMessage=decision.get('userMessage', None)
+ )
except Exception as e:
logger.warning(f"Failed to parse refinement decision JSON: {e}")
- decision = {"decision": "continue", "reason": "default"}
-
- return decision
+ return ReviewResult(
+ status="continue",
+ reason="default",
+ qualityScore=5.0
+ )
async def _createReactActionMessage(self, workflow: ChatWorkflow, selection: Dict[str, Any],
step: int, maxSteps: int, taskIndex: int, messageType: str,
- result: ActionResult = None, observation: Dict[str, Any] = None):
+ result: ActionResult = None, observation: Observation = None):
"""Create user-friendly messages for React workflow actions"""
try:
action = selection.get('action', {})
@@ -641,7 +734,7 @@ class ReactMode(BaseMode):
messageContent = f"{successIcon} **Step {step} Complete**\n\n{userMessage}"
status = "step"
actionProgress = "success" if result and result.success else "fail"
- documentsLabel = observation.get('resultLabel') if observation else f"action_{step}_result"
+ documentsLabel = observation.resultLabel if observation else f"action_{step}_result"
else:
return
@@ -690,7 +783,7 @@ Return only the user-friendly message, no technical details."""
return f"Executing {method}.{actionName} action..."
async def _generateActionResultMessage(self, method: str, actionName: str, result: ActionResult,
- observation: Dict[str, Any], userLanguage: str):
+ observation: Observation, userLanguage: str):
"""Generate user-friendly message explaining action results"""
try:
# Build result context
@@ -698,8 +791,8 @@ Return only the user-friendly message, no technical details."""
if result and result.documents:
docCount = len(result.documents)
resultContext = f"Generated {docCount} document(s)"
- elif observation and observation.get('documentsCount', 0) > 0:
- docCount = observation.get('documentsCount', 0)
+ elif observation and observation.documentsCount > 0:
+ docCount = observation.documentsCount
resultContext = f"Generated {docCount} document(s)"
# Create AI prompt for result message
diff --git a/modules/workflows/processing/shared/executionState.py b/modules/workflows/processing/shared/executionState.py
index 2e1cd04f..ba6a7d0c 100644
--- a/modules/workflows/processing/shared/executionState.py
+++ b/modules/workflows/processing/shared/executionState.py
@@ -2,9 +2,8 @@
# Contains all execution state management logic
import logging
-from typing import List
-from modules.datamodels.datamodelChat import TaskStep
-from modules.datamodels.datamodelChat import ActionResult
+from typing import List, Optional
+from modules.datamodels.datamodelChat import TaskStep, ActionResult, Observation
logger = logging.getLogger(__name__)
@@ -57,24 +56,48 @@ class TaskExecutionState:
patterns.append("permission_issues")
return list(set(patterns))
-def shouldContinue(observation, review=None, current_step: int = 0, max_steps: int = 5) -> bool:
+def shouldContinue(observation: Optional[Observation], review=None, current_step: int = 0, max_steps: int = 5) -> bool:
"""Helper to decide if the iterative loop should continue
- - Stop if review indicates 'stop' or success criteria are met
- - Stop on failure with no retry path
+
+ Args:
+ observation: Observation Pydantic model with action execution results
+ review: ReviewResult or dict with review decision (optional)
+ current_step: Current step number in the iteration
+ max_steps: Maximum allowed steps
+
+ Returns:
+ bool: True if loop should continue, False if should stop
+
+ Logic:
- Stop if max steps reached
+ - Stop if review indicates 'stop' or success criteria are met
+ - Continue if observation indicates failure but allow one more step (caller caps by max_steps)
"""
try:
+ # Stop if max steps reached
if current_step >= max_steps:
return False
- if review and isinstance(review, dict):
- decision = review.get('decision') or review.get('status')
- if decision in ('stop', 'success'):
- return False
- # If observation exists but indicates hard failure with no documents repeatedly
- if observation and isinstance(observation, dict):
- if observation.get('success') is False and observation.get('documentsCount', 0) == 0:
- # allow next step once; the caller can cap by max_steps
+
+ # Check review decision (can be ReviewResult model or dict)
+ if review:
+ if hasattr(review, 'status'):
+ # ReviewResult Pydantic model
+ if review.status in ('stop', 'success'):
+ return False
+ elif isinstance(review, dict):
+ # Legacy dict format
+ decision = review.get('decision') or review.get('status')
+ if decision in ('stop', 'success'):
+ return False
+
+ # Check observation: if hard failure with no documents, allow one more step
+ # The caller will enforce max_steps limit
+ if observation:
+ if observation.success is False and observation.documentsCount == 0:
+ # Allow next step once; the caller caps by max_steps
return True
+
return True
- except Exception:
+ except Exception as e:
+ logger.warning(f"Error in shouldContinue: {e}")
return False
\ No newline at end of file
diff --git a/modules/workflows/processing/shared/placeholderFactory.py b/modules/workflows/processing/shared/placeholderFactory.py
index a4c148ae..31310d21 100644
--- a/modules/workflows/processing/shared/placeholderFactory.py
+++ b/modules/workflows/processing/shared/placeholderFactory.py
@@ -44,12 +44,12 @@ def extractUserPrompt(context: Any) -> str:
try:
services = getattr(context, 'services', None)
- # Determine raw user prompt from services or task_step
+ # Determine raw user prompt from services or taskStep
rawPrompt = None
if services and getattr(services, 'currentUserPrompt', None):
rawPrompt = services.currentUserPrompt
- elif hasattr(context, 'task_step') and context.task_step:
- rawPrompt = context.task_step.objective or 'No request specified'
+ elif hasattr(context, 'taskStep') and context.taskStep:
+ rawPrompt = context.taskStep.objective or 'No request specified'
else:
rawPrompt = 'No request specified'
@@ -60,8 +60,8 @@ def extractUserPrompt(context: Any) -> str:
return rawPrompt
except Exception:
# Robust fallback behavior
- if hasattr(context, 'task_step') and context.task_step:
- return context.task_step.objective or 'No request specified'
+ if hasattr(context, 'taskStep') and context.taskStep:
+ return context.taskStep.objective or 'No request specified'
return 'No request specified'
def extractWorkflowHistory(service: Any, context: Any) -> str:
@@ -232,10 +232,10 @@ def getPreviousRoundContext(services, workflow: Any) -> str:
def extractReviewContent(context: Any) -> str:
"""Extract review content for result validation. Maps to {{KEY:REVIEW_CONTENT}}"""
try:
- if hasattr(context, 'action_results') and context.action_results:
+ if hasattr(context, 'actionResults') and context.actionResults:
# Build result summary
result_summary = ""
- for i, result in enumerate(context.action_results):
+ for i, result in enumerate(context.actionResults):
result_summary += f"\nRESULT {i+1}:\n"
result_summary += f" Success: {result.success}\n"
if result.error:
@@ -264,37 +264,49 @@ def extractReviewContent(context: Any) -> str:
return result_summary
elif hasattr(context, 'observation') and context.observation:
# For observation data, show full content but handle documents specially
- if isinstance(context.observation, dict):
- # Create a copy to modify
- obs_copy = context.observation.copy()
-
- # If there are previews with documents, show only metadata
- if 'previews' in obs_copy and isinstance(obs_copy['previews'], list):
- for preview in obs_copy['previews']:
- if isinstance(preview, dict) and 'snippet' in preview:
- # Replace snippet with metadata indicator
- preview['snippet'] = f"[Content: {len(preview.get('snippet', ''))} characters]"
-
- return json.dumps(obs_copy, indent=2, ensure_ascii=False)
+ # Handle both Pydantic Observation model and dict format
+ from modules.datamodels.datamodelChat import Observation
+
+ if isinstance(context.observation, Observation):
+ # Convert Pydantic model to dict
+ obs_dict = context.observation.model_dump(exclude_none=True) if hasattr(context.observation, 'model_dump') else context.observation.dict()
+ elif isinstance(context.observation, dict):
+ obs_dict = context.observation.copy()
else:
- return json.dumps(context.observation, ensure_ascii=False)
- elif hasattr(context, 'step_result') and context.step_result and 'observation' in context.step_result:
- # For observation data in step_result, show full content but handle documents specially
- observation = context.step_result['observation']
- if isinstance(observation, dict):
- # Create a copy to modify
- obs_copy = observation.copy()
-
- # If there are previews with documents, show only metadata
- if 'previews' in obs_copy and isinstance(obs_copy['previews'], list):
- for preview in obs_copy['previews']:
- if isinstance(preview, dict) and 'snippet' in preview:
- # Replace snippet with metadata indicator
- preview['snippet'] = f"[Content: {len(preview.get('snippet', ''))} characters]"
-
- return json.dumps(obs_copy, indent=2, ensure_ascii=False)
+ # Fallback: try to serialize as-is
+ obs_dict = context.observation.model_dump(exclude_none=True) if hasattr(context.observation, 'model_dump') else context.observation.dict()
+
+ # If there are previews with documents, show only metadata
+ if 'previews' in obs_dict and isinstance(obs_dict['previews'], list):
+ for preview in obs_dict['previews']:
+ if isinstance(preview, dict) and 'snippet' in preview:
+ # Replace snippet with metadata indicator
+ preview['snippet'] = f"[Content: {len(preview.get('snippet', ''))} characters]"
+
+ return json.dumps(obs_dict, indent=2, ensure_ascii=False)
+ elif hasattr(context, 'stepResult') and context.stepResult and 'observation' in context.stepResult:
+ # For observation data in stepResult, show full content but handle documents specially
+ observation = context.stepResult['observation']
+ # Handle both Pydantic Observation model and dict format
+ from modules.datamodels.datamodelChat import Observation
+
+ if isinstance(observation, Observation):
+ # Convert Pydantic model to dict
+ obs_dict = observation.model_dump(exclude_none=True) if hasattr(observation, 'model_dump') else observation.dict()
+ elif isinstance(observation, dict):
+ obs_dict = observation.copy()
else:
- return json.dumps(observation, ensure_ascii=False)
+ # Fallback: try to serialize
+ obs_dict = observation.model_dump(exclude_none=True) if hasattr(observation, 'model_dump') else observation.dict()
+
+ # If there are previews with documents, show only metadata
+ if 'previews' in obs_dict and isinstance(obs_dict['previews'], list):
+ for preview in obs_dict['previews']:
+ if isinstance(preview, dict) and 'snippet' in preview:
+ # Replace snippet with metadata indicator
+ preview['snippet'] = f"[Content: {len(preview.get('snippet', ''))} characters]"
+
+ return json.dumps(obs_dict, indent=2, ensure_ascii=False)
else:
return "No review content available"
except Exception as e:
@@ -304,11 +316,11 @@ def extractReviewContent(context: Any) -> str:
def extractPreviousActionResults(context: Any) -> str:
"""Extract previous action results for learning context. Maps to {{KEY:PREVIOUS_ACTION_RESULTS}}"""
try:
- if not hasattr(context, 'previous_action_results') or not context.previous_action_results:
+ if not hasattr(context, 'previousActionResults') or not context.previousActionResults:
return "No previous actions executed yet"
results = []
- for i, result in enumerate(context.previous_action_results[-5:], 1): # Last 5 results
+ for i, result in enumerate(context.previousActionResults[-5:], 1): # Last 5 results
if hasattr(result, 'resultLabel') and hasattr(result, 'status'):
status = "SUCCESS" if result.status == "completed" else "FAILED"
results.append(f"Action {i}: {result.resultLabel} - {status}")
@@ -332,15 +344,15 @@ def extractLearningsAndImprovements(context: Any) -> str:
learnings.append(f"- {improvement}")
# Get failure patterns
- if hasattr(context, 'failure_patterns') and context.failure_patterns and isinstance(context.failure_patterns, list):
+ if hasattr(context, 'failurePatterns') and context.failurePatterns and isinstance(context.failurePatterns, list):
learnings.append("FAILURE PATTERNS TO AVOID:")
- for pattern in context.failure_patterns[-3:]: # Last 3 patterns
+ for pattern in context.failurePatterns[-3:]: # Last 3 patterns
learnings.append(f"- {pattern}")
# Get successful actions
- if hasattr(context, 'successful_actions') and context.successful_actions and isinstance(context.successful_actions, list):
+ if hasattr(context, 'successfulActions') and context.successfulActions and isinstance(context.successfulActions, list):
learnings.append("SUCCESSFUL APPROACHES:")
- for action in context.successful_actions[-3:]: # Last 3 successful
+ for action in context.successfulActions[-3:]: # Last 3 successful
learnings.append(f"- {action}")
return "\n".join(learnings) if learnings else "No learnings available yet"
@@ -351,11 +363,11 @@ def extractLearningsAndImprovements(context: Any) -> str:
def extractLatestRefinementFeedback(context: Any) -> str:
"""Extract the latest refinement feedback. Maps to {{KEY:LATEST_REFINEMENT_FEEDBACK}}"""
try:
- if not hasattr(context, 'previous_review_result') or not context.previous_review_result or not isinstance(context.previous_review_result, list):
+ if not hasattr(context, 'previousReviewResult') or not context.previousReviewResult or not isinstance(context.previousReviewResult, list):
return "No previous refinement feedback available"
# Get the most recent refinement decision
- latest_decision = context.previous_review_result[-1]
+ latest_decision = context.previousReviewResult[-1]
if not isinstance(latest_decision, dict):
return "No previous refinement feedback available"
diff --git a/modules/workflows/processing/shared/promptGenerationActionsReact.py b/modules/workflows/processing/shared/promptGenerationActionsReact.py
index cec3c25d..338a3af6 100644
--- a/modules/workflows/processing/shared/promptGenerationActionsReact.py
+++ b/modules/workflows/processing/shared/promptGenerationActionsReact.py
@@ -169,8 +169,8 @@ Excludes documents/connections/history entirely.
# determine action objective if available, else fall back to user prompt
if hasattr(context, 'action_objective') and context.action_objective:
actionObjective = context.action_objective
- elif hasattr(context, 'task_step') and context.task_step and getattr(context.task_step, 'objective', None):
- actionObjective = context.task_step.objective
+ elif hasattr(context, 'taskStep') and context.taskStep and getattr(context.taskStep, 'objective', None):
+ actionObjective = context.taskStep.objective
else:
actionObjective = extractUserPrompt(context)
diff --git a/modules/workflows/processing/shared/promptGenerationTaskplan.py b/modules/workflows/processing/shared/promptGenerationTaskplan.py
index 31b48a02..a01dd7e6 100644
--- a/modules/workflows/processing/shared/promptGenerationTaskplan.py
+++ b/modules/workflows/processing/shared/promptGenerationTaskplan.py
@@ -80,8 +80,8 @@ Break down user requests into logical, executable task steps.
"id": "task_1",
"objective": "Clear business objective focusing on what to deliver",
"dependencies": ["task_0"],
- "success_criteria": ["measurable criteria 1", "measurable criteria 2"],
- "estimated_complexity": "low|medium|high",
+ "successCriteria": ["measurable criteria 1", "measurable criteria 2"],
+ "estimatedComplexity": "low|medium|high",
"userMessage": "What this task will accomplish in language '{{KEY:USER_LANGUAGE}}'"
}}
],
diff --git a/modules/workflows/processing/workflowProcessor.py b/modules/workflows/processing/workflowProcessor.py
index 4d7edcca..8e3798b8 100644
--- a/modules/workflows/processing/workflowProcessor.py
+++ b/modules/workflows/processing/workflowProcessor.py
@@ -286,9 +286,9 @@ class WorkflowProcessor:
status = taskResult.status if taskResult else 'unknown'
# Handle both TaskResult and ReviewResult objects
- if hasattr(taskResult, 'met_criteria'):
+ if hasattr(taskResult, 'metCriteria'):
# This is a ReviewResult object
- met = taskResult.met_criteria if taskResult.met_criteria else []
+ met = taskResult.metCriteria if taskResult.metCriteria else []
reviewResult = taskResult.model_dump()
else:
# This is a TaskResult object
diff --git a/modules/workflows/workflowManager.py b/modules/workflows/workflowManager.py
index 3b675d59..6b254007 100644
--- a/modules/workflows/workflowManager.py
+++ b/modules/workflows/workflowManager.py
@@ -338,22 +338,22 @@ class WorkflowManager:
# Build TaskContext (mode-specific behavior is inside WorkflowProcessor)
task_context = TaskContext(
- task_step=task_step,
+ taskStep=task_step,
workflow=workflow,
- workflow_id=workflow.id,
- available_documents=None,
- available_connections=None,
- previous_results=previous_results,
- previous_handover=None,
+ workflowId=workflow.id,
+ availableDocuments=None,
+ availableConnections=None,
+ previousResults=previous_results,
+ previousHandover=None,
improvements=[],
- retry_count=0,
- previous_action_results=[],
- previous_review_result=None,
- is_regeneration=False,
- failure_patterns=[],
- failed_actions=[],
- successful_actions=[],
- criteria_progress={
+ retryCount=0,
+ previousActionResults=[],
+ previousReviewResult=None,
+ isRegeneration=False,
+ failurePatterns=[],
+ failedActions=[],
+ successfulActions=[],
+ criteriaProgress={
'met_criteria': set(),
'unmet_criteria': set(),
'attempt_history': []