From 85f4c6be132bb76b4c81b71e8a1d042791a02abe Mon Sep 17 00:00:00 2001
From: ValueOn AG
Date: Thu, 16 Oct 2025 20:15:55 +0200
Subject: [PATCH] Refactored streamlined validation system over all prompts
---
modules/interfaces/interfaceDbChatObjects.py | 46 +--
modules/services/serviceAi/subCoreAi.py | 43 +--
.../serviceAi/subDocumentGeneration.py | 31 +-
.../serviceAi/subDocumentProcessing.py | 83 ++---
.../services/serviceExtraction/subPipeline.py | 37 +-
.../mainServiceGeneration.py | 30 +-
.../renderers/rendererBaseTemplate.py | 31 +-
.../renderers/rendererImage.py | 24 ++
.../serviceWorkflow/mainServiceWorkflow.py | 241 +++++++-----
modules/shared/debugLogger.py | 123 +++++++
modules/workflows/methods/methodOutlook.py | 99 +++--
.../adaptive/adaptiveLearningEngine.py | 271 ++++++++++++++
.../processing/adaptive/contentValidator.py | 241 ++++--------
.../processing/adaptive/intentAnalyzer.py | 41 +--
.../processing/adaptive/learningEngine.py | 30 +-
.../processing/core/actionExecutor.py | 64 +++-
.../processing/core/messageCreator.py | 41 +--
.../workflows/processing/core/taskPlanner.py | 100 +----
.../processing/modes/modeActionplan.py | 6 +-
.../workflows/processing/modes/modeReact.py | 149 ++++++--
.../shared/promptGenerationActionsReact.py | 97 ++++-
modules/workflows/workflowManager.py | 345 ++++++++----------
22 files changed, 1277 insertions(+), 896 deletions(-)
create mode 100644 modules/shared/debugLogger.py
create mode 100644 modules/workflows/processing/adaptive/adaptiveLearningEngine.py
diff --git a/modules/interfaces/interfaceDbChatObjects.py b/modules/interfaces/interfaceDbChatObjects.py
index 3e5932f3..91d9b4a7 100644
--- a/modules/interfaces/interfaceDbChatObjects.py
+++ b/modules/interfaces/interfaceDbChatObjects.py
@@ -318,49 +318,9 @@ class ChatObjects:
# Update main workflow in database
updated = self.db.recordModify(ChatWorkflow, workflowId, simple_fields)
-
-
- # Handle object field updates (inline to avoid helper dependency)
- if 'logs' in object_fields:
- logs_data = object_fields['logs']
- try:
- for log_data in logs_data:
- if hasattr(log_data, 'model_dump'):
- log_dict = log_data.model_dump() # Pydantic v2
- elif hasattr(log_data, 'dict'):
- log_dict = log_data.dict() # Pydantic v1
- elif hasattr(log_data, 'to_dict'):
- log_dict = log_data.to_dict()
- else:
- log_dict = log_data
- log_dict["workflowId"] = workflowId
- self.createLog(log_dict)
- except Exception as e:
- logger.error(f"Error updating workflow logs: {str(e)}")
- if 'messages' in object_fields:
- messages_data = object_fields['messages']
- try:
- for message_data in messages_data:
- if hasattr(message_data, 'model_dump'):
- msg_dict = message_data.model_dump() # Pydantic v2
- elif hasattr(message_data, 'dict'):
- msg_dict = message_data.dict() # Pydantic v1
- elif hasattr(message_data, 'to_dict'):
- msg_dict = message_data.to_dict()
- else:
- msg_dict = message_data
- msg_dict["workflowId"] = workflowId
- self.updateMessage(msg_dict.get("id"), msg_dict)
- except Exception as e:
- logger.error(f"Error updating workflow messages: {str(e)}")
- if 'stats' in object_fields:
- stats_data = object_fields['stats']
- try:
- if stats_data:
- stats_data["workflowId"] = workflowId
- self.db.recordCreate(ChatStat, stats_data)
- except Exception as e:
- logger.error(f"Error updating workflow stats: {str(e)}")
+
+ # Removed cascade writes for logs/messages/stats during workflow update.
+ # CUD for child entities must be executed via dedicated service methods.
# Load fresh data from normalized tables
logs = self.getLogs(workflowId)
diff --git a/modules/services/serviceAi/subCoreAi.py b/modules/services/serviceAi/subCoreAi.py
index 3f245334..5d7a94ac 100644
--- a/modules/services/serviceAi/subCoreAi.py
+++ b/modules/services/serviceAi/subCoreAi.py
@@ -75,13 +75,7 @@ class SubCoreAi:
else:
full_prompt = prompt
- self._writeAiResponseDebug(
- label='ai_prompt_debug',
- content=full_prompt,
- partIndex=1,
- modelName=None,
- continuation=False
- )
+ # Timestamp-only prompt debug writing removed
except Exception:
pass
@@ -473,38 +467,9 @@ class SubCoreAi:
return full_prompt
- def _writeAiResponseDebug(self, label: str, content: str, partIndex: int = 1, modelName: str = None, continuation: bool = None) -> None:
- """Persist raw AI response parts for debugging under test-chat/ai - only if debug enabled."""
- try:
- # Check if debug logging is enabled
- debug_enabled = self.services.utils.configGet("APP_DEBUG_CHAT_WORKFLOW_ENABLED", False)
- if not debug_enabled:
- return
-
- import os
- from datetime import datetime, UTC
- # Base dir: gateway/test-chat/ai (go up 4 levels from this file)
- # .../gateway/modules/services/serviceAi/subCoreAi.py -> up to gateway root
- gatewayDir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
- outDir = os.path.join(gatewayDir, 'test-chat', 'ai')
- os.makedirs(outDir, exist_ok=True)
- ts = datetime.now(UTC).strftime('%Y%m%d-%H%M%S-%f')[:-3]
- suffix = []
- if partIndex is not None:
- suffix.append(f"part{partIndex}")
- if continuation is not None:
- suffix.append(f"cont_{str(continuation).lower()}")
- if modelName:
- safeModel = ''.join(c if c.isalnum() or c in ('-', '_') else '-' for c in modelName)
- suffix.append(safeModel)
- suffixStr = ('_' + '_'.join(suffix)) if suffix else ''
- fname = f"{ts}_{label}{suffixStr}.txt"
- fpath = os.path.join(outDir, fname)
- with open(fpath, 'w', encoding='utf-8') as f:
- f.write(content or '')
- except Exception:
- # Do not raise; best-effort debug write
- pass
+ def _writeAiResponseDebug(self, label: str, content: Any, partIndex: int = 1, modelName: str = None, continuation: bool = None) -> None:
+ """Disabled verbose debug writing; only minimal files elsewhere."""
+ return
def _exceedsTokenLimit(self, text: str, model: ModelCapabilities, safety_margin: float) -> bool:
"""
diff --git a/modules/services/serviceAi/subDocumentGeneration.py b/modules/services/serviceAi/subDocumentGeneration.py
index 6d7ee4b7..504ba154 100644
--- a/modules/services/serviceAi/subDocumentGeneration.py
+++ b/modules/services/serviceAi/subDocumentGeneration.py
@@ -157,6 +157,20 @@ class SubDocumentGeneration:
# Call AI to enhance the content
response = await self.aiObjects.call(request)
+ # Save generation prompt and response to debug
+ try:
+ from modules.shared.debugLogger import writeDebugFile
+ debugData = {
+ "output_format": outputFormat,
+ "title": title,
+ "context_length": len(context),
+ "extracted_content_keys": list(aiResponseJson.keys()) if isinstance(aiResponseJson, dict) else []
+ }
+ writeDebugFile(generationPrompt, "generation_single", debugData)
+ writeDebugFile(response.content or '', "generation_single_response")
+ except Exception:
+ pass
+
if response and response.content:
# Parse the AI response as JSON
try:
@@ -360,6 +374,21 @@ class SubDocumentGeneration:
# Call AI to enhance the content
response = await self.aiObjects.call(request)
+ # Save generation prompt and response to debug
+ try:
+ from modules.shared.debugLogger import writeDebugFile
+ debugData = {
+ "output_format": outputFormat,
+ "title": doc_data["title"],
+ "document_index": i,
+ "context_length": len(context),
+ "extracted_content_keys": list(complete_document.keys()) if isinstance(complete_document, dict) else []
+ }
+ writeDebugFile(generationPrompt, f"generation_multi_doc_{i}", debugData)
+ writeDebugFile(response.content or '', f"generation_multi_doc_{i}_response")
+ except Exception:
+ pass
+
if response and response.content:
# Parse the AI response as JSON
try:
@@ -659,7 +688,7 @@ Return only the JSON response.
"documentsLabel": label,
"documents": []
}
- message = services.workflow.createMessage(messageData)
+ message = services.workflow.storeMessageWithDocuments(services.workflow.workflow, messageData, [])
if not message:
return
diff --git a/modules/services/serviceAi/subDocumentProcessing.py b/modules/services/serviceAi/subDocumentProcessing.py
index d85a5341..44dcc41c 100644
--- a/modules/services/serviceAi/subDocumentProcessing.py
+++ b/modules/services/serviceAi/subDocumentProcessing.py
@@ -104,17 +104,10 @@ class SubDocumentProcessing:
# FIXED: Merge with preserved chunk relationships
mergedContent = self._mergeChunkResults(chunkResults, options)
- # Save merged extraction content to debug file - only if debug enabled
+ # Save merged extraction content to debug
try:
- debug_enabled = self.services.utils.configGet("APP_DEBUG_CHAT_WORKFLOW_ENABLED", False)
- if debug_enabled:
- import os
- from datetime import datetime, UTC
- ts = datetime.now(UTC).strftime("%Y%m%d-%H%M%S")
- debug_root = "./test-chat/ai"
- os.makedirs(debug_root, exist_ok=True)
- with open(os.path.join(debug_root, f"{ts}_extraction_merged.txt"), "w", encoding="utf-8") as f:
- f.write(mergedContent or "")
+ from modules.shared.debugLogger import writeDebugFile
+ writeDebugFile(mergedContent or '', "extraction_merged")
except Exception:
pass
@@ -204,18 +197,12 @@ class SubDocumentProcessing:
logger.debug(f"Normalization error type: {type(e).__name__}")
# Continue with original merged JSON instead of re-raising
- # Save merged JSON extraction content to debug file - only if debug enabled
+ # Save merged JSON extraction content to debug
try:
- debug_enabled = self.services.utils.configGet("APP_DEBUG_CHAT_WORKFLOW_ENABLED", False)
- if debug_enabled:
- import os
- import json as _json
- from datetime import datetime, UTC
- ts = datetime.now(UTC).strftime("%Y%m%d-%H%M%S")
- debug_root = "./test-chat/ai"
- os.makedirs(debug_root, exist_ok=True)
- with open(os.path.join(debug_root, f"{ts}_extraction_merged.json"), "w", encoding="utf-8") as f:
- f.write(_json.dumps(mergedJsonDocument, ensure_ascii=False, indent=2))
+ from modules.shared.debugLogger import writeDebugFile
+ import json as _json
+ jsonStr = _json.dumps(mergedJsonDocument, ensure_ascii=False, indent=2)
+ writeDebugFile(jsonStr, "extraction_merged_json", mergedJsonDocument)
except Exception:
pass
@@ -532,21 +519,19 @@ class SubDocumentProcessing:
# Log extraction response
self.services.utils.debugLogToFile(f"EXTRACTION RESPONSE LENGTH: {len(ai_result) if ai_result else 0} characters", "AI_SERVICE")
- # Save full extraction prompt and response to debug file - only if debug enabled
- debug_enabled = self.services.utils.configGet("APP_DEBUG_CHAT_WORKFLOW_ENABLED", False)
- if debug_enabled:
- try:
- import os
- from datetime import datetime, UTC
- ts = datetime.now(UTC).strftime("%Y%m%d-%H%M%S")
- debug_root = "./test-chat/ai"
- os.makedirs(debug_root, exist_ok=True)
- with open(os.path.join(debug_root, f"{ts}_extraction_container_chunk_{chunk_index}.txt"), "w", encoding="utf-8") as f:
- f.write(f"EXTRACTION PROMPT:\n{prompt}\n\n")
- f.write(f"EXTRACTION CONTEXT:\n{part.data if part.data else 'No context'}\n\n")
- f.write(f"EXTRACTION RESPONSE:\n{ai_result if ai_result else 'No response'}\n")
- except Exception:
- pass
+ # Save extraction prompt and response to debug
+ try:
+ from modules.shared.debugLogger import writeDebugFile
+ debugData = {
+ "chunk_index": chunk_index,
+ "mime_type": part.mimeType,
+ "type_group": part.typeGroup,
+ "context_length": len(part.data) if part.data else 0
+ }
+ writeDebugFile(augmented_prompt, f"extraction_chunk_{chunk_index}", debugData)
+ writeDebugFile(ai_result or '', f"extraction_chunk_{chunk_index}_response")
+ except Exception:
+ pass
# If generating JSON, validate the response
if generate_json:
@@ -641,19 +626,19 @@ class SubDocumentProcessing:
# Log extraction response length
self.services.utils.debugLogToFile(f"EXTRACTION RESPONSE LENGTH: {len(ai_result) if ai_result else 0} characters", "AI_SERVICE")
- # Save extraction response to debug file (without verbose prompt) - only if debug enabled
- debug_enabled = self.services.utils.configGet("APP_DEBUG_CHAT_WORKFLOW_ENABLED", False)
- if debug_enabled:
- try:
- import os
- from datetime import datetime, UTC
- ts = datetime.now(UTC).strftime("%Y%m%d-%H%M%S")
- debug_root = "./test-chat/ai"
- os.makedirs(debug_root, exist_ok=True)
- with open(os.path.join(debug_root, f"{ts}_extraction_chunk_{chunk_index}.txt"), "w", encoding="utf-8") as f:
- f.write(f"EXTRACTION RESPONSE:\n{ai_result if ai_result else 'No response'}\n")
- except Exception:
- pass
+ # Save extraction prompt and response to debug
+ try:
+ from modules.shared.debugLogger import writeDebugFile
+ debugData = {
+ "chunk_index": chunk_index,
+ "mime_type": part.mimeType,
+ "type_group": part.typeGroup,
+ "context_length": len(part.data) if part.data else 0
+ }
+ writeDebugFile(augmented_prompt_text, f"extraction_chunk_{chunk_index}", debugData)
+ writeDebugFile(ai_result or '', f"extraction_chunk_{chunk_index}_response")
+ except Exception:
+ pass
# If generating JSON, validate the response
if generate_json:
diff --git a/modules/services/serviceExtraction/subPipeline.py b/modules/services/serviceExtraction/subPipeline.py
index 9b18ea88..bb654b5e 100644
--- a/modules/services/serviceExtraction/subPipeline.py
+++ b/modules/services/serviceExtraction/subPipeline.py
@@ -101,42 +101,7 @@ def runExtraction(extractorRegistry: ExtractorRegistry, chunkerRegistry: Chunker
logger.debug(f"runExtraction: Final parts after merging: {len(parts)} (chunks: {len(chunk_parts)})")
logger.debug(f"runExtraction - Final parts: {len(parts)} (chunks: {len(chunk_parts)})")
- # DEBUG: dump parts and chunks to files - only if debug enabled
- try:
- debug_enabled = APP_CONFIG.get("APP_DEBUG_CHAT_WORKFLOW_ENABLED", False)
- if debug_enabled:
- base_dir = "./test-chat/ai"
- os.makedirs(base_dir, exist_ok=True)
-
- # Generate timestamp for consistent naming
- from datetime import datetime, UTC
- ts = datetime.now(UTC).strftime('%Y%m%d-%H%M%S-%f')[:-3]
-
- # Write a summary file
- summary_lines: List[str] = [f"fileName: {fileName}", f"mimeType: {mimeType}", f"totalParts: {len(parts)}"]
- text_index = 0
- for idx, part in enumerate(parts):
- is_texty = part.typeGroup in ("text", "table", "structure")
- size = int(part.metadata.get("size", 0) or 0)
- is_chunk = bool(part.metadata.get("chunk", False))
- summary_lines.append(
- f"part[{idx}]: typeGroup={part.typeGroup}, label={part.label}, size={size}, chunk={is_chunk}"
- )
- if is_texty and getattr(part, "data", None):
- text_index += 1
- fname = f"{ts}_extract_{fileName}_part_{idx:03d}_{'chunk' if is_chunk else 'full'}_{text_index:03d}.txt"
- fpath = os.path.join(base_dir, fname)
- with open(fpath, "w", encoding="utf-8") as f:
- f.write(f"# typeGroup: {part.typeGroup}\n# label: {part.label}\n# chunk: {is_chunk}\n# size: {size}\n\n")
- f.write(str(part.data))
-
- # Write summary file
- summary_fname = f"{ts}_extract_{fileName}_summary.txt"
- summary_fpath = os.path.join(base_dir, summary_fname)
- with open(summary_fpath, "w", encoding="utf-8") as f:
- f.write("\n".join(summary_lines))
- except Exception as _e:
- logger.debug(f"Debug dump skipped: {_e}")
+ # Timestamp-only extraction debug dumps removed
return ContentExtracted(id=makeId(), parts=parts)
diff --git a/modules/services/serviceGeneration/mainServiceGeneration.py b/modules/services/serviceGeneration/mainServiceGeneration.py
index d38cea96..53e8a848 100644
--- a/modules/services/serviceGeneration/mainServiceGeneration.py
+++ b/modules/services/serviceGeneration/mainServiceGeneration.py
@@ -319,27 +319,7 @@ class GenerationService:
if "sections" not in extractedContent:
raise ValueError("extractedContent must contain 'sections' field")
- # DEBUG: Log renderer input metadata only (no verbose JSON) - only if debug enabled
- try:
- debug_enabled = self.services.utils.configGet("APP_DEBUG_CHAT_WORKFLOW_ENABLED", False)
- if debug_enabled:
- import os, json
- ts = datetime.now(UTC).strftime("%Y%m%d-%H%M%S")
- debug_root = "./test-chat/ai"
- debug_dir = os.path.join(debug_root, f"render_input_{ts}")
- os.makedirs(debug_dir, exist_ok=True)
- with open(os.path.join(debug_dir, "meta.txt"), "w", encoding="utf-8") as f:
- f.write(f"title: {title}\nformat: {outputFormat}\ncontent_type: {type(extractedContent).__name__}\n")
- f.write(f"content_size: {len(str(extractedContent))} characters\n")
- f.write(f"sections_count: {len(extractedContent.get('sections', []))}\n")
- # Also write the extracted content JSON for inspection
- try:
- with open(os.path.join(debug_dir, "extracted_content.json"), "w", encoding="utf-8") as jf:
- json.dump(extractedContent, jf, ensure_ascii=False, indent=2)
- except Exception:
- pass
- except Exception:
- pass
+ # Remove extra debug file writes for render inputs per simplification
# Get the appropriate renderer for the format
renderer = self._getFormatRenderer(outputFormat)
@@ -348,13 +328,7 @@ class GenerationService:
# Render the JSON content directly (AI generation handled by main service)
renderedContent, mimeType = await renderer.render(extractedContent, title, userPrompt, aiService)
- # DEBUG: dump rendered output
- try:
- import os
- with open(os.path.join(debug_dir, "rendered_output.txt"), "w", encoding="utf-8") as f:
- f.write(renderedContent or "")
- except Exception:
- pass
+ # Remove extra debug output file writes
logger.info(f"Successfully rendered JSON report to {outputFormat} format: {len(renderedContent)} characters")
return renderedContent, mimeType
diff --git a/modules/services/serviceGeneration/renderers/rendererBaseTemplate.py b/modules/services/serviceGeneration/renderers/rendererBaseTemplate.py
index 150a903b..24728df4 100644
--- a/modules/services/serviceGeneration/renderers/rendererBaseTemplate.py
+++ b/modules/services/serviceGeneration/renderers/rendererBaseTemplate.py
@@ -91,16 +91,25 @@ class BaseRenderer(ABC):
return True
def _get_section_type(self, section: Dict[str, Any]) -> str:
- """Get the type of a section."""
- return section.get("content_type", "paragraph")
+ """Get the type of a section; default to 'paragraph' for non-dict inputs."""
+ if isinstance(section, dict):
+ return section.get("content_type", "paragraph")
+ # If section is a list or any other type, treat as paragraph elements
+ return "paragraph"
def _get_section_data(self, section: Dict[str, Any]) -> List[Dict[str, Any]]:
- """Get the elements of a section."""
- return section.get("elements", [])
+ """Get the elements of a section; if a list is provided directly, return it."""
+ if isinstance(section, dict):
+ return section.get("elements", [])
+ if isinstance(section, list):
+ return section
+ return []
def _get_section_id(self, section: Dict[str, Any]) -> str:
"""Get the ID of a section (if available)."""
- return section.get("id", "unknown")
+ if isinstance(section, dict):
+ return section.get("id", "unknown")
+ return "unknown"
def _extract_table_data(self, section_data: Dict[str, Any]) -> Tuple[List[str], List[List[str]]]:
"""Extract table headers and rows from section data."""
@@ -332,6 +341,18 @@ class BaseRenderer(ABC):
response = await ai_service.aiObjects.call(request)
+ # Save styling prompt and response to debug
+ try:
+ from modules.shared.debugLogger import writeDebugFile
+ debugData = {
+ "template_length": len(style_template),
+ "default_styles_keys": list(default_styles.keys()) if isinstance(default_styles, dict) else []
+ }
+ writeDebugFile(style_template, "renderer_styling", debugData)
+ writeDebugFile(response.content or '', "renderer_styling_response")
+ except Exception:
+ pass
+
import json
import re
diff --git a/modules/services/serviceGeneration/renderers/rendererImage.py b/modules/services/serviceGeneration/renderers/rendererImage.py
index 863a52e2..f54e8f82 100644
--- a/modules/services/serviceGeneration/renderers/rendererImage.py
+++ b/modules/services/serviceGeneration/renderers/rendererImage.py
@@ -59,6 +59,18 @@ class RendererImage(BaseRenderer):
# Create AI prompt for image generation
image_prompt = await self._create_image_generation_prompt(extracted_content, document_title, user_prompt, ai_service)
+ # Save image generation prompt to debug
+ try:
+ from modules.shared.debugLogger import writeDebugFile
+ debugData = {
+ "title": document_title,
+ "user_prompt_length": len(user_prompt) if user_prompt else 0,
+ "extracted_content_keys": list(extracted_content.keys()) if isinstance(extracted_content, dict) else []
+ }
+ writeDebugFile(image_prompt, "renderer_image_generation", debugData)
+ except Exception:
+ pass
+
# Generate image using AI
image_result = await ai_service.aiObjects.generateImage(
prompt=image_prompt,
@@ -67,6 +79,18 @@ class RendererImage(BaseRenderer):
style="vivid"
)
+ # Save image generation response to debug
+ try:
+ from modules.shared.debugLogger import writeDebugFile
+ responseData = {
+ "success": image_result.get("success", False) if image_result else False,
+ "has_image_data": bool(image_result.get("image_data", "")) if image_result else False,
+ "result_keys": list(image_result.keys()) if isinstance(image_result, dict) else []
+ }
+ writeDebugFile(str(image_result), "renderer_image_generation_response", responseData)
+ except Exception:
+ pass
+
# Extract base64 image data from result
if image_result and image_result.get("success", False):
image_data = image_result.get("image_data", "")
diff --git a/modules/services/serviceWorkflow/mainServiceWorkflow.py b/modules/services/serviceWorkflow/mainServiceWorkflow.py
index fe4b4d0f..e2f03dbb 100644
--- a/modules/services/serviceWorkflow/mainServiceWorkflow.py
+++ b/modules/services/serviceWorkflow/mainServiceWorkflow.py
@@ -2,7 +2,7 @@ import logging
import uuid
from typing import Dict, Any, List, Optional
from modules.datamodels.datamodelUam import User, UserConnection
-from modules.datamodels.datamodelChat import ChatDocument, ChatMessage
+from modules.datamodels.datamodelChat import ChatDocument, ChatMessage, ChatStat
from modules.datamodels.datamodelChat import ChatContentExtracted
from modules.services.serviceExtraction.mainServiceExtraction import ExtractionService
from modules.services.serviceGeneration.subDocumentUtility import getFileExtension, getMimeTypeFromExtension, detectContentTypeFromData
@@ -79,7 +79,29 @@ class WorkflowService:
"""Get ChatDocuments from a list of document references using all three formats."""
try:
workflow = self.services.currentWorkflow
+ logger.debug(f"getChatDocumentsFromDocumentList: input documentList = {documentList}")
logger.debug(f"getChatDocumentsFromDocumentList: currentWorkflow.id = {workflow.id if workflow and hasattr(workflow, 'id') else 'NO_ID'}")
+
+ # Debug: list available messages with their labels and document names
+ try:
+ if workflow and hasattr(workflow, 'messages') and workflow.messages:
+ msg_lines = []
+ for message in workflow.messages:
+ label = getattr(message, 'documentsLabel', None)
+ doc_names = []
+ if getattr(message, 'documents', None):
+ for doc in message.documents:
+ name = getattr(doc, 'fileName', None) or getattr(doc, 'documentName', None) or 'Unnamed'
+ doc_names.append(name)
+ msg_lines.append(
+ f"- id={getattr(message, 'id', None)}, label={label}, docs={doc_names}"
+ )
+ if msg_lines:
+ logger.debug("getChatDocumentsFromDocumentList: available messages:\n" + "\n".join(msg_lines))
+ else:
+ logger.debug("getChatDocumentsFromDocumentList: no messages available on current workflow")
+ except Exception as e:
+ logger.debug(f"getChatDocumentsFromDocumentList: unable to enumerate messages for debug: {e}")
all_documents = []
for doc_ref in documentList:
@@ -482,24 +504,86 @@ class WorkflowService:
logger.error(f"Error getting workflow: {str(e)}")
raise
- def createMessage(self, messageData: Dict[str, Any]):
- """Create a new message by delegating to the chat interface and append to in-memory workflow."""
+ # === Service-level transactions (DB write-through + in-memory sync) ===
+
+ def storeMessageWithDocuments(self, workflow: Any, messageData: Dict[str, Any], documents: List[Any]):
+ """Persist message and documents, then bind them into in-memory workflow (replace-by-id)."""
+ # Ensure workflowId on message
+ messageData = dict(messageData or {})
+ messageData["workflowId"] = workflow.id
+ # Attach documents to message creation via interface (it persists message then docs)
+ messageDataWithDocs = dict(messageData)
+ messageDataWithDocs["documents"] = documents or []
+ chatInterface = self.interfaceDbChat
+ chatMessage = chatInterface.createMessage(messageDataWithDocs)
+ if not chatMessage:
+ raise ValueError("Failed to create message with documents")
+ # In-memory sync: replace or append
+ # replace-by-id if exists
+ replaced = False
+ for i, m in enumerate(workflow.messages or []):
+ if getattr(m, 'id', None) == getattr(chatMessage, 'id', None):
+ workflow.messages[i] = chatMessage
+ replaced = True
+ break
+ if not replaced:
+ workflow.messages.append(chatMessage)
+ return chatMessage
+
+ def storeLog(self, workflow: Any, logData: Dict[str, Any]) -> Any:
+ """Persist ChatLog and map it into the in-memory workflow logs list."""
+ logData = dict(logData or {})
+ logData["workflowId"] = workflow.id
+ chatInterface = self.interfaceDbChat
+ chatLog = chatInterface.createLog(logData)
+ if not chatLog:
+ raise ValueError("Failed to create log")
+ # replace-by-id if exists
+ replaced = False
+ for i, lg in enumerate(workflow.logs):
+ if getattr(lg, 'id', None) == getattr(chatLog, 'id', None):
+ workflow.logs[i] = chatLog
+ replaced = True
+ break
+ if not replaced:
+ workflow.logs.append(chatLog)
+ return chatLog
+
+ def storeWorkflowStat(self, workflow: Any, statData: Dict[str, Any]) -> Any:
+ """Persist workflow-level ChatStat and set/replace on in-memory workflow."""
+ statData = dict(statData or {})
+ statData["workflowId"] = workflow.id
+ chatInterface = self.interfaceDbChat
+ # Reuse updateWorkflowStats for incremental or create raw record when needed
try:
- message = self.interfaceDbChat.createMessage(messageData)
- try:
- # Keep in-memory workflow messages in sync
- workflow = getattr(self.services, 'currentWorkflow', None)
- if workflow and hasattr(workflow, 'messages') and message:
- # Avoid duplicates if same message was already appended
- if not any(getattr(m, 'id', None) == getattr(message, 'id', None) for m in workflow.messages):
- workflow.messages.append(message)
- except Exception:
- # Never fail if local append has issues
- pass
- return message
+ self.updateWorkflowStats(workflow.id, **{
+ 'bytesSent': statData.get('bytesSent', 0),
+ 'bytesReceived': statData.get('bytesReceived', 0),
+ 'tokenCount': statData.get('tokenCount', 0)
+ })
+ except Exception:
+ pass
+ stat = chatInterface.getWorkflowStats(workflow.id)
+ workflow.stats = stat
+ return stat
+
+ def storeMessageStat(self, workflow: Any, messageId: str, statData: Dict[str, Any]) -> Any:
+ """Persist message-level ChatStat and bind to the message in-memory."""
+ statData = dict(statData or {})
+ statData["workflowId"] = workflow.id
+ statData["messageId"] = messageId
+ # Persist as ChatStat row
+ try:
+ self.interfaceDbChat.db.recordCreate(ChatStat, statData)
except Exception as e:
- logger.error(f"Error creating message: {str(e)}")
+ logger.error(f"Failed to persist message stat: {e}")
raise
+ stat = self.interfaceDbChat.getMessageStats(messageId)
+ for m in workflow.messages or []:
+ if getattr(m, 'id', None) == messageId:
+ m.stats = stat
+ break
+ return stat
def updateMessage(self, messageId: str, messageData: Dict[str, Any]):
"""Update message by delegating to the chat interface"""
@@ -509,29 +593,6 @@ class WorkflowService:
logger.error(f"Error updating message: {str(e)}")
raise
- def createLog(self, logData: Dict[str, Any]):
- """Create a new log entry by delegating to the chat interface and append to in-memory workflow logs."""
- try:
- log_entry = self.interfaceDbChat.createLog(logData)
- try:
- workflow = getattr(self.services, 'currentWorkflow', None)
- if workflow and hasattr(workflow, 'logs') and log_entry:
- # Avoid duplicates by id if present, else compare message+timestamp tuple
- get_id = getattr(log_entry, 'id', None)
- if get_id is not None:
- if not any(getattr(l, 'id', None) == get_id for l in workflow.logs):
- workflow.logs.append(log_entry)
- else:
- key = (getattr(log_entry, 'message', None), getattr(log_entry, 'publishedAt', None))
- if not any((getattr(l, 'message', None), getattr(l, 'publishedAt', None)) == key for l in workflow.logs):
- workflow.logs.append(log_entry)
- except Exception:
- pass
- return log_entry
- except Exception as e:
- logger.error(f"Error creating log: {str(e)}")
- raise
-
def getDocumentCount(self) -> str:
"""Get document count for task planning (matching old handlingTasks.py logic)"""
try:
@@ -609,30 +670,7 @@ class WorkflowService:
# Get document reference list using the exact same logic as old system
document_list = self._getDocumentReferenceList(workflow)
- # Optional: dump a concise document index for debugging
- try:
- debug_enabled = self.services.utils.configGet("APP_DEBUG_CHAT_WORKFLOW_ENABLED", False)
- if debug_enabled:
- import os, json
- from datetime import datetime, UTC
- ts = datetime.now(UTC).strftime("%Y%m%d-%H%M%S")
- debug_root = "./test-chat/ai"
- os.makedirs(debug_root, exist_ok=True)
- doc_index = []
- for bucket in ("chat", "history"):
- for ex in document_list.get(bucket, []) or []:
- doc_index.append({
- "bucket": bucket,
- "label": ex.get("documentsLabel"),
- "documents": ex.get("documents", [])
- })
- with open(os.path.join(debug_root, f"{ts}_available_documents_index.json"), "w", encoding="utf-8") as f:
- json.dump({
- "workflowId": getattr(workflow, 'id', None),
- "index": doc_index
- }, f, ensure_ascii=False, indent=2)
- except Exception:
- pass
+ # Timestamp-only available documents index dump removed
# Build index string for AI action planning
context = ""
@@ -723,43 +761,50 @@ class WorkflowService:
except Exception:
return False
+ # Simplified, deterministic logic:
+ # - Walk messages newest-first
+ # - For each document, assign it exactly once to a bucket based on the message round
+ # - Never allow the same doc to appear in both buckets
chat_exchanges = []
history_exchanges = []
-
- in_current_round = True
+ seen_doc_ids = set()
+ current_round = getattr(workflow, 'currentRound', None)
+
for message in reversed(workflow.messages):
- is_first = message.status == "first" if hasattr(message, 'status') else False
-
- doc_exchange = None
- if message.documents:
- existing_label = getattr(message, 'documentsLabel', None)
- if existing_label:
- validated_label = self._validateDocumentLabelConsistency(message)
- doc_refs = []
- for doc in message.documents:
- if not _is_valid_document(doc):
- # Skip empty/invalid docs
- continue
- doc_ref = self._getDocumentReferenceFromChatDocument(doc, message)
- doc_refs.append(doc_ref)
- if doc_refs:
- doc_exchange = {
- 'documentsLabel': validated_label,
- 'documents': doc_refs
- }
-
- if doc_exchange:
- if in_current_round:
- chat_exchanges.append(doc_exchange)
- else:
- history_exchanges.append(doc_exchange)
-
- if in_current_round and is_first:
- in_current_round = False
-
- chat_exchanges.sort(key=lambda x: self._getMessageSequenceForExchange(x, workflow), reverse=True)
- history_exchanges.sort(key=lambda x: self._getMessageSequenceForExchange(x, workflow), reverse=True)
-
+ if not getattr(message, 'documents', None):
+ continue
+
+ label = getattr(message, 'documentsLabel', None)
+ if not label:
+ # Skip messages without a label to keep references consistent
+ continue
+
+ doc_refs = []
+ for doc in message.documents:
+ if not _is_valid_document(doc):
+ continue
+ # Avoid duplicates across chat/history
+ doc_id = getattr(doc, 'id', None)
+ if not doc_id or doc_id in seen_doc_ids:
+ continue
+ seen_doc_ids.add(doc_id)
+ doc_ref = self.getDocumentReferenceFromChatDocument(doc)
+ doc_refs.append(doc_ref)
+
+ if not doc_refs:
+ continue
+
+ entry = {
+ 'documentsLabel': label,
+ 'documents': doc_refs
+ }
+
+ msg_round = getattr(message, 'roundNumber', None)
+ if current_round is not None and msg_round == current_round:
+ chat_exchanges.append(entry)
+ else:
+ history_exchanges.append(entry)
+
return {
"chat": chat_exchanges,
"history": history_exchanges
@@ -803,7 +848,7 @@ class WorkflowService:
action_num = message.actionNumber if hasattr(message, 'actionNumber') else 0
return f"round{round_num}_task{task_num}_action{action_num}"
- def _getDocumentReferenceFromChatDocument(self, document, message) -> str:
+ def getDocumentReferenceFromChatDocument(self, document) -> str:
"""Get document reference using document ID and filename."""
try:
# Use document ID and filename for simple reference
diff --git a/modules/shared/debugLogger.py b/modules/shared/debugLogger.py
new file mode 100644
index 00000000..fb8bde68
--- /dev/null
+++ b/modules/shared/debugLogger.py
@@ -0,0 +1,123 @@
+"""
+Simple debug logger for AI prompts and responses.
+Writes files chronologically to gateway/test-chat/ai/ with sequential numbering.
+"""
+import os
+import json
+from datetime import datetime, UTC
+from typing import Any, Optional
+
+
+def _getDebugDir() -> str:
+ """Get the debug directory path."""
+ gatewayDir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
+ return os.path.join(gatewayDir, 'test-chat', 'ai')
+
+
+def _getNextSequenceNumber() -> int:
+ """Get the next sequence number by counting existing files."""
+ debugDir = _getDebugDir()
+ if not os.path.exists(debugDir):
+ return 1
+
+ # Count existing numbered files
+ files = [f for f in os.listdir(debugDir) if f.startswith(('0', '1', '2', '3', '4', '5', '6', '7', '8', '9'))]
+ return len(files) + 1
+
+
+def _formatJsonReadable(data: Any) -> str:
+ """
+ Format JSON data in a readable line-by-line structure.
+ Handles both structured objects and text representations of dicts/lists.
+
+ Args:
+ data: The data to format
+
+ Returns:
+ Formatted string representation
+ """
+ try:
+ # First try to parse if it's a string representation
+ if isinstance(data, str):
+ try:
+ # Try to parse as JSON first
+ parsed = json.loads(data)
+ data = parsed
+ except json.JSONDecodeError:
+ # Try to evaluate as Python literal (for dict/list strings)
+ try:
+ import ast
+ parsed = ast.literal_eval(data)
+ if isinstance(parsed, (dict, list)):
+ data = parsed
+ except (ValueError, SyntaxError):
+ # If all parsing fails, treat as plain text
+ pass
+
+ # Convert to JSON string with proper indentation
+ if isinstance(data, (dict, list)):
+ jsonStr = json.dumps(data, ensure_ascii=False, default=str, indent=2)
+ else:
+ jsonStr = str(data)
+
+ # Split into lines and add line numbers for better readability
+ lines = jsonStr.split('\n')
+ formattedLines = []
+
+ for i, line in enumerate(lines, 1):
+ # Add line number and proper spacing
+ lineNum = f"{i:3d}: "
+ formattedLines.append(f"{lineNum}{line}")
+
+ return '\n'.join(formattedLines)
+ except Exception:
+ # Fallback to string representation if JSON formatting fails
+ return str(data)
+
+
+def writeDebugFile(content: str, fileType: str, data: Optional[Any] = None) -> None:
+ """
+ Write debug content to a file with sequential numbering.
+
+ Args:
+ content: The main content to write
+ fileType: Type of file (e.g., 'prompt', 'response', 'placeholders')
+ data: Optional additional data to include as JSON
+ """
+ try:
+ debugDir = _getDebugDir()
+ os.makedirs(debugDir, exist_ok=True)
+
+ seqNum = _getNextSequenceNumber()
+ ts = datetime.now(UTC).strftime('%Y%m%d-%H%M%S')
+ # Add 3-digit sequence number for uniqueness
+ tsWithSeq = f"{ts}-{seqNum:03d}"
+
+ filename = f"{tsWithSeq}-{fileType}.txt"
+ filepath = os.path.join(debugDir, filename)
+
+ with open(filepath, 'w', encoding='utf-8') as f:
+ f.write(content)
+
+ # If structured data provided, also append a human-readable section to the main .txt
+ try:
+ if data is not None:
+ formatted = _formatJsonReadable(data)
+ with open(filepath, 'a', encoding='utf-8') as f:
+ f.write("\n\n=== FORMATTED DATA (human-readable) ===\n")
+ f.write(formatted)
+ f.write("\n")
+ except Exception:
+ pass
+
+ # If additional data provided, write it as a separate JSON file with readable formatting
+ if data is not None:
+ jsonFilename = f"{tsWithSeq}-{fileType}_data.json"
+ jsonFilepath = os.path.join(debugDir, jsonFilename)
+ with open(jsonFilepath, 'w', encoding='utf-8') as f:
+ formattedData = _formatJsonReadable(data)
+ f.write(formattedData)
+
+ except Exception as e:
+ # Silent fail - don't break the main flow
+ pass
diff --git a/modules/workflows/methods/methodOutlook.py b/modules/workflows/methods/methodOutlook.py
index f613cbe1..9909bd9f 100644
--- a/modules/workflows/methods/methodOutlook.py
+++ b/modules/workflows/methods/methodOutlook.py
@@ -1143,51 +1143,46 @@ class MethodOutlook(MethodBase):
chatDocuments = self.services.workflow.getChatDocumentsFromDocumentList(documentList)
# Create AI prompt for email composition
- # Build document reference list for AI
+ # Build document reference list for AI with expanded list contents when possible
doc_references = documentList
doc_list_text = ""
if doc_references:
- doc_list_text = f"Available_Document_References: {', '.join(doc_references)}"
+ lines = ["Available_Document_References:"]
+ workflow_obj = getattr(self.services, 'currentWorkflow', None)
+ for ref in doc_references:
+ # Each item is a label: resolve to its document list and render contained items
+ list_docs = self.services.workflow.getChatDocumentsFromDocumentList([ref]) or []
+ if list_docs:
+ for d in list_docs:
+ doc_ref_label = self.services.workflow.getDocumentReferenceFromChatDocument(d)
+ lines.append(f"- {doc_ref_label}")
+ else:
+ lines.append(" - (no documents)")
+ doc_list_text = "\n" + "\n".join(lines)
else:
doc_list_text = "Available_Document_References: (No documents available for attachment)"
# Escape only the user-controlled context to prevent prompt injection
escaped_context = context.replace('"', '\\"').replace('\n', '\\n').replace('\r', '\\r')
- ai_prompt = f"""
-Compose a professional email based on the following context and requirements:
-
-CONTEXT:
-----------------
+ ai_prompt = f"""Compose an email based on this context:
+-------
{escaped_context}
-----------------
-
-RECIPIENT: {to}
-EMAIL STYLE: {emailStyle}
-MAX LENGTH: {maxLength} characters
+-------
+Recipients: {to}
+Style: {emailStyle}
+Max length: {maxLength} characters
{doc_list_text}
-Please generate:
-1. A clear, professional subject line
-2. A well-structured email body that addresses the context appropriately
-3. Use the {emailStyle} tone throughout
-4. Decide which documents from Available_Document_References (if any) should be attached to the email
+Based on the context, decide which documents to attach.
-Return your response in the following JSON format:
+Return JSON:
{{
- "subject": "Your generated subject line here",
- "body": "Your generated email body here (can include HTML formatting like
for line breaks)",
- "attachments": ["document_reference", "document_reference", ...]
-}}
-
-Make sure the email is:
-- Professional and appropriate for the context
-- Clear and concise
-- Well-structured with proper greeting and closing
-- Relevant to the provided context
-- Include only relevant documents as attachments (use EXACT document references from the Available_Document_References)
-"""
+ "subject": "subject line",
+ "body": "email body (HTML allowed)",
+ "attachments": ["doc_ref1", "doc_ref2"]
+}}"""
# Call AI service to generate email content
try:
@@ -1231,16 +1226,44 @@ Make sure the email is:
return ActionResult.isFailure(error="AI did not generate valid subject and body")
# Use AI-selected attachments if provided, otherwise use all documents
- if ai_attachments:
- # Filter documentList to only include AI-selected attachments
- selected_docs = [doc_ref for doc_ref in documentList if doc_ref in ai_attachments]
- if selected_docs:
- documentList = selected_docs
- logger.info(f"AI selected {len(selected_docs)} documents for attachment: {selected_docs}")
+ if documentList:
+ try:
+ available_refs = [documentList] if isinstance(documentList, str) else documentList
+ available_docs = self.services.workflow.getChatDocumentsFromDocumentList(available_refs) or []
+ except Exception:
+ available_docs = []
+
+ # Normalize AI attachments to a list of strings
+ if isinstance(ai_attachments, str):
+ ai_attachments = [ai_attachments]
+ elif isinstance(ai_attachments, list):
+ ai_attachments = [a for a in ai_attachments if isinstance(a, str)]
+
+ if ai_attachments:
+ try:
+ ai_refs = [ai_attachments] if isinstance(ai_attachments, str) else ai_attachments
+ ai_docs = self.services.workflow.getChatDocumentsFromDocumentList(ai_refs) or []
+ except Exception:
+ ai_docs = []
+
+ # Intersect by document id
+ available_ids = {getattr(d, 'id', None) for d in available_docs}
+ selected_docs = [d for d in ai_docs if getattr(d, 'id', None) in available_ids]
+
+ if selected_docs:
+ # Map selected ChatDocuments back to docItem references
+ documentList = [self.services.workflow.getDocumentReferenceFromChatDocument(d) for d in selected_docs]
+ logger.info(f"AI selected {len(documentList)} documents for attachment (resolved via ChatDocuments)")
+ else:
+ # No intersection; use all available documents
+ documentList = [self.services.workflow.getDocumentReferenceFromChatDocument(d) for d in available_docs]
+ logger.warning("AI selected attachments not found in available documents, using all documents")
else:
- logger.warning("AI selected attachments not found in available documents, using all documents")
+ # No AI selection; use all available documents
+ documentList = [self.services.workflow.getDocumentReferenceFromChatDocument(d) for d in available_docs]
+ logger.warning("AI did not specify attachments, using all available documents")
else:
- logger.info("AI did not specify attachments, using all available documents")
+ logger.info("No documents provided in documentList; skipping attachment processing")
except json.JSONDecodeError as e:
logger.error(f"Failed to parse AI response as JSON: {str(e)}")
diff --git a/modules/workflows/processing/adaptive/adaptiveLearningEngine.py b/modules/workflows/processing/adaptive/adaptiveLearningEngine.py
new file mode 100644
index 00000000..cc21d42a
--- /dev/null
+++ b/modules/workflows/processing/adaptive/adaptiveLearningEngine.py
@@ -0,0 +1,271 @@
+# adaptiveLearningEngine.py
+# Enhanced learning engine that tracks validation patterns and adapts prompts
+
+import json
+import logging
+from typing import Dict, Any, List, Optional
+from datetime import datetime, timezone
+from collections import defaultdict
+
+logger = logging.getLogger(__name__)
+
+class AdaptiveLearningEngine:
+ """Enhanced learning engine that tracks validation patterns and adapts prompts"""
+
+ def __init__(self):
+ self.validationHistory = [] # Store validation results with context
+ self.failurePatterns = defaultdict(list) # Track failure patterns by action type
+ self.successPatterns = defaultdict(list) # Track success patterns
+ self.actionAttempts = defaultdict(int) # Track attempt counts per action
+ self.learningInsights = {} # Store learned insights per workflow
+
+ def recordValidationResult(self, validationResult: Dict[str, Any], actionContext: Dict[str, Any],
+ workflowId: str, attemptNumber: int):
+ """Record validation result and learn from it"""
+ try:
+ actionType = actionContext.get('actionType', 'unknown')
+ actionName = actionContext.get('actionName', 'unknown')
+
+ # Store validation history
+ validationEntry = {
+ 'workflowId': workflowId,
+ 'actionType': actionType,
+ 'actionName': actionName,
+ 'attemptNumber': attemptNumber,
+ 'validationResult': validationResult,
+ 'actionContext': actionContext,
+ 'timestamp': datetime.now(timezone.utc).isoformat(),
+ 'success': validationResult.get('overallSuccess', False),
+ 'qualityScore': validationResult.get('qualityScore', 0.0)
+ }
+
+ self.validationHistory.append(validationEntry)
+
+ # Track patterns
+ if validationResult.get('overallSuccess', False):
+ self.successPatterns[actionType].append(validationEntry)
+ else:
+ self.failurePatterns[actionType].append(validationEntry)
+
+ # Update attempt count
+ self.actionAttempts[f"{workflowId}:{actionType}"] += 1
+
+ # Generate learning insights
+ self._generateLearningInsights(workflowId, actionType)
+
+ logger.info(f"Recorded validation for {actionType} (attempt {attemptNumber}): "
+ f"Success={validationResult.get('overallSuccess', False)}, "
+ f"Quality={validationResult.get('qualityScore', 0.0)}")
+
+ except Exception as e:
+ logger.error(f"Error recording validation result: {str(e)}")
+
+ def getAdaptiveContextForActionSelection(self, workflowId: str, userPrompt: str) -> Dict[str, Any]:
+ """Generate adaptive context for action selection prompt"""
+ try:
+ # Get recent validation history for this workflow
+ recentValidations = [
+ v for v in self.validationHistory
+ if v['workflowId'] == workflowId
+ ][-5:] # Last 5 validations
+
+ # Analyze failure patterns
+ failureAnalysis = self._analyzeFailurePatterns(recentValidations)
+
+ # Generate specific guidance for next action
+ adaptiveGuidance = self._generateActionGuidance(userPrompt, recentValidations, failureAnalysis)
+
+ return {
+ 'recentValidations': recentValidations,
+ 'failureAnalysis': failureAnalysis,
+ 'adaptiveGuidance': adaptiveGuidance,
+ 'learningInsights': self.learningInsights.get(workflowId, {}),
+ 'escalationLevel': self._getEscalationLevel(workflowId)
+ }
+
+ except Exception as e:
+ logger.error(f"Error generating adaptive context: {str(e)}")
+ return {}
+
+ def getAdaptiveContextForParameters(self, workflowId: str, actionType: str,
+ parametersContext: str) -> Dict[str, Any]:
+ """Generate adaptive context for parameter selection prompt"""
+ try:
+ # Get validation history for this specific action type
+ actionValidations = [
+ v for v in self.validationHistory
+ if v['workflowId'] == workflowId and v['actionType'] == actionType
+ ][-3:] # Last 3 attempts for this action
+
+ # Analyze what went wrong in previous attempts
+ failureAnalysis = self._analyzeParameterFailures(actionValidations)
+
+ # Generate specific parameter guidance
+ parameterGuidance = self._generateParameterGuidance(actionType, parametersContext, failureAnalysis)
+
+ return {
+ 'actionValidations': actionValidations,
+ 'failureAnalysis': failureAnalysis,
+ 'parameterGuidance': parameterGuidance,
+ 'attemptNumber': len(actionValidations) + 1,
+ 'escalationLevel': self._getEscalationLevel(workflowId)
+ }
+
+ except Exception as e:
+ logger.error(f"Error generating parameter context: {str(e)}")
+ return {}
+
+ def _analyzeFailurePatterns(self, validations: List[Dict[str, Any]]) -> Dict[str, Any]:
+ """Analyze failure patterns from validation history"""
+ if not validations:
+ return {}
+
+ failedValidations = [v for v in validations if not v['success']]
+
+ if not failedValidations:
+ return {'hasFailures': False}
+
+ # Extract common failure themes
+ commonIssues = []
+ for validation in failedValidations:
+ issues = validation['validationResult'].get('validationDetails', [{}])
+ for issue in issues:
+ commonIssues.extend(issue.get('issues', []))
+
+ # Count most common issues
+ issueCounts = defaultdict(int)
+ for issue in commonIssues:
+ issueCounts[issue] += 1
+
+ return {
+ 'hasFailures': True,
+ 'failureCount': len(failedValidations),
+ 'commonIssues': dict(sorted(issueCounts.items(), key=lambda x: x[1], reverse=True)),
+ 'lastFailure': failedValidations[-1] if failedValidations else None
+ }
+
+ def _analyzeParameterFailures(self, validations: List[Dict[str, Any]]) -> Dict[str, Any]:
+ """Analyze parameter-specific failure patterns"""
+ if not validations:
+ return {'hasFailures': False}
+
+ failedValidations = [v for v in validations if not v['success']]
+
+ if not failedValidations:
+ return {'hasFailures': False}
+
+ # Extract common failure themes
+ commonIssues = []
+ for validation in failedValidations:
+ issues = validation['validationResult'].get('validationDetails', [{}])
+ for issue in issues:
+ commonIssues.extend(issue.get('issues', []))
+
+ # Count most common issues
+ issueCounts = defaultdict(int)
+ for issue in commonIssues:
+ issueCounts[issue] += 1
+
+ return {
+ 'hasFailures': True,
+ 'failureCount': len(failedValidations),
+ 'commonIssues': dict(sorted(issueCounts.items(), key=lambda x: x[1], reverse=True)),
+ 'lastFailure': failedValidations[-1] if failedValidations else None,
+ 'attemptNumber': len(validations)
+ }
+
+ def _generateActionGuidance(self, userPrompt: str, validations: List[Dict[str, Any]],
+ failureAnalysis: Dict[str, Any]) -> str:
+ """Generate specific guidance for next action based on learning"""
+ if not failureAnalysis.get('hasFailures', False):
+ return "No previous failures detected. Proceed with standard approach."
+
+ guidance_parts = []
+
+ # Add failure awareness
+ failureCount = failureAnalysis.get('failureCount', 0)
+ if failureCount >= 3:
+ guidance_parts.append("CRITICAL: Multiple previous attempts have failed. Consider alternative approaches.")
+ elif failureCount >= 1:
+ guidance_parts.append("WARNING: Previous attempts have failed. Learn from validation feedback.")
+
+ # Add specific issue guidance
+ commonIssues = failureAnalysis.get('commonIssues', {})
+ if commonIssues:
+ guidance_parts.append("COMMON FAILURE PATTERNS:")
+ for issue, count in list(commonIssues.items())[:3]: # Top 3 issues
+ guidance_parts.append(f"- {issue} (occurred {count} times)")
+
+ # Add specific action guidance based on user prompt
+ if "email" in userPrompt.lower() and "outlook" in userPrompt.lower():
+ if any("account" in str(issue).lower() for issue in commonIssues.keys()):
+ guidance_parts.append("SPECIFIC GUIDANCE: Ensure email is sent from the correct account (valueon).")
+ if any("attachment" in str(issue).lower() for issue in commonIssues.keys()):
+ guidance_parts.append("SPECIFIC GUIDANCE: Verify PDF attachment is properly included.")
+ if any("summary" in str(issue).lower() for issue in commonIssues.keys()):
+ guidance_parts.append("SPECIFIC GUIDANCE: Include German summary in email body.")
+
+ return "\n".join(guidance_parts) if guidance_parts else "No specific guidance available."
+
+ def _generateParameterGuidance(self, actionType: str, parametersContext: str,
+ failureAnalysis: Dict[str, Any]) -> str:
+ """Generate specific parameter guidance based on previous failures"""
+ if not failureAnalysis.get('hasFailures', False):
+ return "No previous parameter failures. Use standard parameter values."
+
+ guidance_parts = []
+
+ # Add attempt awareness
+ attemptNumber = failureAnalysis.get('attemptNumber', 1)
+ if attemptNumber >= 3:
+ guidance_parts.append(f"ATTEMPT #{attemptNumber}: Previous attempts failed. Adjust parameters based on validation feedback.")
+
+ # Add specific parameter guidance based on action type
+ if actionType == "outlook.composeAndSendEmailWithContext":
+ guidance_parts.append("EMAIL PARAMETER GUIDANCE:")
+ guidance_parts.append("- context: Be very specific about account (valueon), appointment time (Friday), and requirements")
+ guidance_parts.append("- emailStyle: Use 'formal' for business emails")
+ guidance_parts.append("- maxLength: Set to 2000+ for detailed emails with summaries")
+
+ # Add specific guidance based on common failures
+ commonIssues = failureAnalysis.get('commonIssues', {})
+ if any("account" in str(issue).lower() for issue in commonIssues.keys()):
+ guidance_parts.append("- context: MUST specify 'from valueon account' explicitly")
+ if any("attachment" in str(issue).lower() for issue in commonIssues.keys()):
+ guidance_parts.append("- documentList: Ensure PDF is properly referenced")
+ if any("summary" in str(issue).lower() for issue in commonIssues.keys()):
+ guidance_parts.append("- context: MUST request '10-12 sentence German summary' explicitly")
+
+ return "\n".join(guidance_parts) if guidance_parts else "Use standard parameter values."
+
+ def _getEscalationLevel(self, workflowId: str) -> str:
+ """Determine escalation level based on failure patterns"""
+ workflowValidations = [v for v in self.validationHistory if v['workflowId'] == workflowId]
+ failedAttempts = len([v for v in workflowValidations if not v['success']])
+
+ if failedAttempts >= 5:
+ return "critical"
+ elif failedAttempts >= 3:
+ return "high"
+ elif failedAttempts >= 1:
+ return "medium"
+ else:
+ return "low"
+
+ def _generateLearningInsights(self, workflowId: str, actionType: str):
+ """Generate learning insights for a workflow"""
+ if workflowId not in self.learningInsights:
+ self.learningInsights[workflowId] = {}
+
+ # Analyze patterns for this workflow
+ workflowValidations = [v for v in self.validationHistory if v['workflowId'] == workflowId]
+
+ insights = {
+ 'totalAttempts': len(workflowValidations),
+ 'successfulAttempts': len([v for v in workflowValidations if v['success']]),
+ 'failedAttempts': len([v for v in workflowValidations if not v['success']]),
+ 'lastActionType': actionType,
+ 'escalationLevel': self._getEscalationLevel(workflowId)
+ }
+
+ self.learningInsights[workflowId] = insights
diff --git a/modules/workflows/processing/adaptive/contentValidator.py b/modules/workflows/processing/adaptive/contentValidator.py
index 5423dc8e..0aa634c3 100644
--- a/modules/workflows/processing/adaptive/contentValidator.py
+++ b/modules/workflows/processing/adaptive/contentValidator.py
@@ -11,43 +11,37 @@ logger = logging.getLogger(__name__)
class ContentValidator:
"""Validates delivered content against user intent"""
- def __init__(self, services=None):
+ def __init__(self, services=None, learningEngine=None):
self.services = services
+ self.learningEngine = learningEngine
async def validateContent(self, documents: List[Any], intent: Dict[str, Any]) -> Dict[str, Any]:
- """Validates delivered content against user intent using AI"""
- try:
- # Use AI for comprehensive validation
- return await self._validateWithAI(documents, intent)
-
- except Exception as e:
- logger.error(f"Error validating content: {str(e)}")
- return self._createFailedValidationResult(str(e))
+ """Validates delivered content against user intent using AI (single attempt; parse-or-fail)"""
+ return await self._validateWithAI(documents, intent)
def _extractContent(self, doc: Any) -> str:
- """Extracts content from a document"""
+ """Extracts content from a document with size protection for large documents"""
try:
if hasattr(doc, 'documentData'):
data = doc.documentData
if isinstance(data, dict) and 'content' in data:
- return str(data['content'])
+ content = data['content']
+ # For large content, check size before converting to string
+ if hasattr(content, '__len__') and len(str(content)) > 100000: # 100KB threshold
+ # For very large content, return a size indicator instead
+ return f"[Large document content - {len(str(content))} characters - truncated for validation]"
+ return str(content)
else:
- return str(data)
+ content = data
+ # For large content, check size before converting to string
+ if hasattr(content, '__len__') and len(str(content)) > 100000: # 100KB threshold
+ return f"[Large document content - {len(str(content))} characters - truncated for validation]"
+ return str(content)
return ""
except Exception:
return ""
- def _createFailedValidationResult(self, error: str) -> Dict[str, Any]:
- """Creates a failed validation result in a schema-stable shape"""
- return {
- "overallSuccess": None, # Unknown when validator itself failed
- "qualityScore": None,
- "validationDetails": [],
- "improvementSuggestions": [f"NEXT STEP: Fix validation error - {error}. Check system logs for more details and retry the operation."],
- "schemaCompliant": False,
- "originalType": "error",
- "missingFields": ["overallSuccess", "qualityScore"],
- }
+ # Removed schema fallback creator to keep failures explicit
def _isValidJsonResponse(self, response: str) -> bool:
"""Checks if response contains valid JSON structure"""
@@ -62,46 +56,7 @@ class ContentValidator:
except:
return False
- def _extractFallbackValidationResult(self, response: str) -> Dict[str, Any]:
- """Extracts a minimal validation result from a malformed AI response (schema-stable)"""
- try:
- import re
-
- # Extract key values using regex patterns
- overall_success = re.search(r'"overallSuccess"\s*:\s*(true|false)', response, re.IGNORECASE)
- quality_score = re.search(r'"qualityScore"\s*:\s*([0-9.]+)', response)
- gap_analysis = re.search(r'"gapAnalysis"\s*:\s*"([^"]*)"', response)
-
- # Determine overall success from context if not found
- if not overall_success:
- # Look for positive/negative indicators in the text
- if any(word in response.lower() for word in ['success', 'complete', 'fulfilled', 'satisfied']):
- overall_success = True
- elif any(word in response.lower() for word in ['failed', 'incomplete', 'missing', 'error']):
- overall_success = False
- else:
- overall_success = False
-
- parsed_overall = overall_success if isinstance(overall_success, bool) else (overall_success.group(1).lower() == 'true' if overall_success else None)
- parsed_quality = float(quality_score.group(1)) if quality_score else None
-
- result = {
- "overallSuccess": parsed_overall,
- "qualityScore": parsed_quality,
- "validationDetails": [{
- "documentName": "AI Validation (Fallback)",
- "gapAnalysis": gap_analysis.group(1) if gap_analysis else "Unable to parse detailed analysis",
- "successCriteriaMet": []
- }],
- "improvementSuggestions": ["NEXT STEP: AI response was malformed - retry the operation for better results"],
- "schemaCompliant": False,
- "originalType": "text",
- "missingFields": [k for k, v in {"overallSuccess": parsed_overall, "qualityScore": parsed_quality}.items() if v is None],
- }
- return result
- except Exception as e:
- logger.error(f"Fallback extraction failed: {str(e)}")
- return None
+ # Removed text-based fallback extraction to avoid hiding issues
async def _validateWithAI(self, documents: List[Any], intent: Dict[str, Any]) -> Dict[str, Any]:
"""AI-based comprehensive validation - single main function"""
@@ -118,46 +73,46 @@ class ContentValidator:
"content": content[:2000] # Limit content for AI processing
})
- # Create comprehensive AI validation prompt
- validationPrompt = f"""
-You are a comprehensive task completion validator. Analyze if the delivered content fulfills the user's request.
+ # Create structured AI validation prompt
+ successCriteria = intent.get('successCriteria', [])
+ criteriaCount = len(successCriteria)
+
+ validationPrompt = f"""TASK VALIDATION
-USER REQUEST: {intent.get('primaryGoal', 'Unknown')}
-EXPECTED DATA TYPE: {intent.get('dataType', 'unknown')}
+USER REQUEST: '{intent.get('primaryGoal', 'Unknown')}'
+EXPECTED TYPE: {intent.get('dataType', 'unknown')}
EXPECTED FORMAT: {intent.get('expectedFormat', 'unknown')}
-SUCCESS CRITERIA: {intent.get('successCriteria', [])}
+SUCCESS CRITERIA ({criteriaCount} items): {successCriteria}
-DELIVERED CONTENT:
+VALIDATION RULES:
+1. Check if content matches expected data type
+2. Check if content matches expected format
+3. Verify each success criterion is met
+4. Rate overall quality (0.0-1.0)
+5. Identify specific gaps
+6. Suggest next steps
+
+OUTPUT FORMAT - JSON ONLY (no prose):
+{{
+ "overallSuccess": false,
+ "qualityScore": 0.0,
+ "dataTypeMatch": false,
+ "formatMatch": false,
+ "successCriteriaMet": {[False] * criteriaCount},
+ "gapAnalysis": "Specific gaps found",
+ "improvementSuggestions": ["NEXT STEP: Action 1", "NEXT STEP: Action 2"],
+ "validationDetails": [
+ {{
+ "documentName": "Document Name",
+ "issues": ["Issue 1", "Issue 2"],
+ "suggestions": ["NEXT STEP: Fix 1", "NEXT STEP: Fix 2"]
+ }}
+ ]
+}}
+
+DELIVERED CONTENT TO CHECK:
{json.dumps(documentContents, indent=2)}
-Perform comprehensive validation:
-1. Check if content matches expected data type
-2. Check if content matches expected format
-3. Verify success criteria are met
-4. Assess overall quality and completeness
-5. Identify specific gaps and issues
-6. Provide actionable next steps
-
-CRITICAL: You MUST respond with ONLY the JSON object below. NO TEXT ANALYSIS. NO EXPLANATIONS. NO OTHER CONTENT.
-
-RESPOND WITH THIS EXACT JSON FORMAT:
-
-{{
- "overallSuccess": false,
- "qualityScore": 0.5,
- "dataTypeMatch": false,
- "formatMatch": false,
- "successCriteriaMet": [false, false],
- "gapAnalysis": "Content does not match expected format and lacks required elements",
- "improvementSuggestions": ["NEXT STEP: Create proper content in expected format", "NEXT STEP: Ensure all success criteria are met"],
- "validationDetails": [
- {{
- "documentName": "Content Validation",
- "issues": ["Format mismatch", "Missing required elements"],
- "suggestions": ["NEXT STEP: Fix format", "NEXT STEP: Add missing elements"]
- }}
- ]
-}}
"""
# Call AI service for validation
@@ -170,56 +125,24 @@ RESPOND WITH THIS EXACT JSON FORMAT:
documents=None,
options=request_options
)
+ # Write validation prompt/response to debug
+ from modules.shared.debugLogger import writeDebugFile
+ writeDebugFile(validationPrompt, "validation_content_prompt")
+ writeDebugFile(response or '', "validation_content_response")
- # If first attempt fails, try with more explicit prompt
- if response and not self._isValidJsonResponse(response):
- logger.debug("First AI validation attempt failed, retrying with explicit JSON-only prompt")
- explicitPrompt = f"""
-VALIDATE AND RETURN JSON ONLY - NO TEXT ANALYSIS
+ # No retries or correction prompts here; parse-or-fail below
-Request: {intent.get('primaryGoal', 'Unknown')}
-Data Type: {intent.get('dataType', 'unknown')}
-Format: {intent.get('expectedFormat', 'unknown')}
-Criteria: {intent.get('successCriteria', [])}
-
-Content: {json.dumps(documentContents, indent=2)}
-
-RESPOND WITH THIS EXACT JSON FORMAT - NO OTHER TEXT:
-
-{{
- "overallSuccess": false,
- "qualityScore": 0.3,
- "dataTypeMatch": false,
- "formatMatch": false,
- "successCriteriaMet": [false, false],
- "gapAnalysis": "Content does not match expected format and lacks required elements",
- "improvementSuggestions": ["NEXT STEP: Create proper content in expected format", "NEXT STEP: Ensure all success criteria are met"],
- "validationDetails": [
- {{
- "documentName": "Content Validation",
- "issues": ["Format mismatch", "Missing required elements"],
- "suggestions": ["NEXT STEP: Fix format", "NEXT STEP: Add missing elements"]
- }}
- ]
-}}
-"""
- response = await self.services.ai.callAi(
- prompt=explicitPrompt,
- documents=None,
- options=request_options
- )
-
if not response or not response.strip():
logger.warning("AI validation returned empty response")
- return self._createFailedValidationResult("AI validation failed - empty response")
-
+ raise ValueError("AI validation failed - empty response")
+
# Clean and extract JSON from response
result = response.strip()
logger.debug(f"AI validation response length: {len(result)}")
-
+
# Try to find JSON in the response with multiple strategies
import re
-
+
# Strategy 1: Look for JSON in markdown code blocks
json_match = re.search(r'```(?:json)?\s*(\{.*?\})\s*```', result, re.DOTALL)
if json_match:
@@ -231,23 +154,15 @@ RESPOND WITH THIS EXACT JSON FORMAT - NO OTHER TEXT:
if not json_match:
# Strategy 3: Look for any JSON object
json_match = re.search(r'\{.*\}', result, re.DOTALL)
-
+
if json_match:
result = json_match.group(0)
logger.debug(f"Extracted JSON directly: {result[:200]}...")
else:
- logger.debug(f"No JSON found in AI response, trying fallback extraction: {result[:200]}...")
+ logger.debug(f"No JSON found in AI response: {result[:200]}...")
logger.debug(f"Full AI response: {result}")
-
- # Try fallback extraction for text responses
- fallback_result = self._extractFallbackValidationResult(result)
- if fallback_result:
- logger.info("Using fallback text extraction for validation")
- return fallback_result
-
- logger.warning("All AI validation attempts failed - no JSON found and fallback extraction failed")
- return self._createFailedValidationResult("AI validation failed - no JSON in response")
-
+ raise ValueError("AI validation failed - no JSON in response")
+
try:
aiResult = json.loads(result)
logger.info("AI validation JSON parsed successfully")
@@ -259,7 +174,7 @@ RESPOND WITH THIS EXACT JSON FORMAT - NO OTHER TEXT:
criteria = aiResult.get("successCriteriaMet")
improvements = aiResult.get("improvementSuggestions", [])
- # Normalize into schema-stable object without forcing failure defaults
+ # Normalize while keeping failures explicit
normalized = {
"overallSuccess": overall if isinstance(overall, bool) else None,
"qualityScore": float(quality) if isinstance(quality, (int, float)) else None,
@@ -278,26 +193,18 @@ RESPOND WITH THIS EXACT JSON FORMAT - NO OTHER TEXT:
normalized["missingFields"].append("overallSuccess")
if normalized["qualityScore"] is None:
normalized["missingFields"].append("qualityScore")
- # If any critical field missing, mark as not fully compliant
if normalized["missingFields"]:
normalized["schemaCompliant"] = False
return normalized
-
+
except json.JSONDecodeError as json_error:
- logger.warning(f"All AI validation attempts failed - invalid JSON: {str(json_error)}")
+ logger.warning(f"AI validation invalid JSON: {str(json_error)}")
logger.debug(f"JSON content: {result}")
-
- # Try to extract key information from malformed response
- fallbackResult = self._extractFallbackValidationResult(result)
- if fallbackResult:
- logger.info("Using fallback validation result from malformed JSON")
- return fallbackResult
-
- return self._createFailedValidationResult(f"AI validation failed - invalid JSON: {str(json_error)}")
-
- return self._createFailedValidationResult("AI validation failed - no response")
-
+ raise
+
+ raise ValueError("AI validation failed - no response")
+
except Exception as e:
logger.error(f"AI validation failed: {str(e)}")
- return self._createFailedValidationResult(f"AI validation error: {str(e)}")
\ No newline at end of file
+ raise
\ No newline at end of file
diff --git a/modules/workflows/processing/adaptive/intentAnalyzer.py b/modules/workflows/processing/adaptive/intentAnalyzer.py
index e7f10cab..f5fd5616 100644
--- a/modules/workflows/processing/adaptive/intentAnalyzer.py
+++ b/modules/workflows/processing/adaptive/intentAnalyzer.py
@@ -14,19 +14,11 @@ class IntentAnalyzer:
self.services = services
async def analyzeUserIntent(self, userPrompt: str, context: Any) -> Dict[str, Any]:
- """Analyzes user intent from prompt and context using AI"""
- try:
- # Use AI to analyze intent
- aiAnalysis = await self._analyzeIntentWithAI(userPrompt, context)
- if aiAnalysis:
- return aiAnalysis
-
- # Fallback to basic analysis if AI fails
- return self._createBasicIntentAnalysis(userPrompt)
-
- except Exception as e:
- logger.error(f"Error analyzing user intent: {str(e)}")
- return self._createDefaultIntentAnalysis(userPrompt)
+ """Analyzes user intent from prompt and context using AI (single attempt, no fallbacks)"""
+ aiAnalysis = await self._analyzeIntentWithAI(userPrompt, context)
+ if not aiAnalysis:
+ raise ValueError("AI intent analysis failed: empty or invalid response")
+ return aiAnalysis
async def _analyzeIntentWithAI(self, userPrompt: str, context: Any) -> Dict[str, Any]:
"""Uses AI to analyze user intent - language-agnostic"""
@@ -68,26 +60,19 @@ CRITICAL: Respond with ONLY the JSON object below. Do not include any explanator
from modules.datamodels.datamodelAi import AiCallOptions, OperationType
request_options = AiCallOptions()
request_options.operationType = OperationType.GENERAL
+ # Write prompt to debug
+ from modules.shared.debugLogger import writeDebugFile
+ writeDebugFile(analysisPrompt, "intent_prompt")
response = await self.services.ai.callAi(
prompt=analysisPrompt,
documents=None,
options=request_options
)
+ # Write response to debug
+ writeDebugFile(response or '', "intent_response")
- # If first attempt fails, try with more explicit prompt
- if response and not self._isValidJsonResponse(response):
- logger.debug("First AI intent analysis attempt failed, retrying with explicit JSON-only prompt")
- explicitPrompt = f"""
-{analysisPrompt}
-
-IMPORTANT: You must respond with ONLY valid JSON. No explanations, no analysis, no text before or after. Just the JSON object.
-"""
- response = await self.services.ai.callAi(
- prompt=explicitPrompt,
- documents=None,
- options=request_options
- )
+ # No retries or correction prompts here; parse-or-fail below
if not response or not response.strip():
logger.warning("AI intent analysis returned empty response")
@@ -113,7 +98,7 @@ IMPORTANT: You must respond with ONLY valid JSON. No explanations, no analysis,
json_match = re.search(r'\{.*\}', result, re.DOTALL)
if not json_match:
- logger.warning(f"All AI intent analysis attempts failed - no JSON found in response: {result[:200]}...")
+ logger.warning(f"AI intent analysis failed - no JSON found in response: {result[:200]}...")
logger.debug(f"Full AI response: {result}")
return None
@@ -126,7 +111,7 @@ IMPORTANT: You must respond with ONLY valid JSON. No explanations, no analysis,
return aiResult
except json.JSONDecodeError as json_error:
- logger.warning(f"All AI intent analysis attempts failed - invalid JSON: {str(json_error)}")
+ logger.warning(f"AI intent analysis invalid JSON: {str(json_error)}")
logger.debug(f"JSON content: {result}")
return None
diff --git a/modules/workflows/processing/adaptive/learningEngine.py b/modules/workflows/processing/adaptive/learningEngine.py
index 2d5836a6..f18d1d17 100644
--- a/modules/workflows/processing/adaptive/learningEngine.py
+++ b/modules/workflows/processing/adaptive/learningEngine.py
@@ -29,8 +29,21 @@ class LearningEngine:
# Update strategies based on feedback
self._updateStrategies(feedback, intent)
- logger.info(f"Learning from feedback: {feedback.get('actionAttempted', 'unknown')} - "
- f"Quality: {feedback.get('qualityScore', 0):.2f}, Intent Match: {feedback.get('intentMatchScore', 0):.2f}")
+ # Normalize scores for safe logging
+ _qs = feedback.get('qualityScore', 0.0)
+ _im = feedback.get('intentMatchScore', 0.0)
+ try:
+ _qs = float(0.0 if _qs is None else _qs)
+ except Exception:
+ _qs = 0.0
+ try:
+ _im = float(0.0 if _im is None else _im)
+ except Exception:
+ _im = 0.0
+ logger.info(
+ f"Learning from feedback: {feedback.get('actionAttempted', 'unknown')} - "
+ f"Quality: {_qs:.2f}, Intent Match: {_im:.2f}"
+ )
except Exception as e:
logger.error(f"Error learning from feedback: {str(e)}")
@@ -61,8 +74,17 @@ class LearningEngine:
"""Updates strategies based on feedback"""
strategyKey = self._getStrategyKey(intent)
actionAttempted = feedback.get('actionAttempted', 'unknown')
- qualityScore = feedback.get('qualityScore', 0)
- intentMatchScore = feedback.get('intentMatchScore', 0)
+ # Coerce possibly None or non-numeric to floats
+ qs_raw = feedback.get('qualityScore', 0.0)
+ im_raw = feedback.get('intentMatchScore', 0.0)
+ try:
+ qualityScore = float(0.0 if qs_raw is None else qs_raw)
+ except Exception:
+ qualityScore = 0.0
+ try:
+ intentMatchScore = float(0.0 if im_raw is None else im_raw)
+ except Exception:
+ intentMatchScore = 0.0
# Get or create strategy
if strategyKey not in self.strategies:
diff --git a/modules/workflows/processing/core/actionExecutor.py b/modules/workflows/processing/core/actionExecutor.py
index 95a41606..1060e954 100644
--- a/modules/workflows/processing/core/actionExecutor.py
+++ b/modules/workflows/processing/core/actionExecutor.py
@@ -156,10 +156,9 @@ class ActionExecutor:
action.setError(result.error or "Action execution failed")
logger.error(f"Action failed: {result.error}")
- # Create database log entry for action failure
- self.services.interfaceDbChat.createLog({
- "workflowId": workflow.id,
- "message": f"❌ **Task {taskNum}**\n\n❌ **Action {actionNum}/{totalActions}** failed: {result.error}",
+ # Create database log entry for action failure (write-through + bind)
+ self.services.workflow.storeLog(workflow, {
+ "message": f"❌ **Task {taskNum}**❌ **Action {actionNum}/{totalActions}** failed: {result.error}",
"type": "error"
})
@@ -227,7 +226,7 @@ class ActionExecutor:
logger.error(f"Error creating action completion message: {str(e)}")
def _writeTraceLog(self, contextText: str, data: Any) -> None:
- """Write trace data to configured trace file if in debug mode with improved JSON formatting"""
+ """Write trace data and categorized React debug files when in DEBUG level"""
try:
import os
import json
@@ -247,8 +246,41 @@ class ActionExecutor:
# Ensure log directory exists
os.makedirs(logDir, exist_ok=True)
- # Create trace file path
+ # Create trace file path (aggregate)
traceFile = os.path.join(logDir, "log_trace.log")
+
+ # Derive a React-category filename based on context
+ def _reactFileForContext(text: str) -> str:
+ t = (text or "").lower()
+ if "action result" in t:
+ return "react_action_results.jsonl"
+ if "extraction" in t:
+ if "prompt" in t:
+ return "react_extraction_prompts.jsonl"
+ if "response" in t:
+ return "react_extraction_responses.jsonl"
+ if "generation" in t:
+ if "prompt" in t:
+ return "react_generation_prompts.jsonl"
+ if "response" in t:
+ return "react_generation_responses.jsonl"
+ if "render" in t:
+ if "prompt" in t:
+ return "react_rendering_prompts.jsonl"
+ if "response" in t:
+ return "react_rendering_responses.jsonl"
+ if "validation" in t:
+ if "prompt" in t:
+ return "react_validation_prompts.jsonl"
+ if "response" in t:
+ return "react_validation_responses.jsonl"
+ return "react_misc.jsonl"
+
+ # Daily suffix for React files
+ dateSuffix = datetime.fromtimestamp(self.services.utils.getUtcTimestamp(), UTC).strftime("%Y%m%d")
+ baseReactFile = _reactFileForContext(contextText)
+ name, ext = os.path.splitext(baseReactFile)
+ reactFile = os.path.join(logDir, f"{name}_{dateSuffix}{ext}")
# Format the trace entry with better structure
timestamp = datetime.fromtimestamp(self.services.utils.getUtcTimestamp(), UTC).strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
@@ -296,6 +328,26 @@ class ActionExecutor:
# Write to trace file
with open(traceFile, "a", encoding="utf-8") as f:
f.write(traceEntry)
+
+ # Also write a compact JSONL record to the categorized React file
+ try:
+ workflowId = getattr(getattr(self, 'services', None), 'currentWorkflow', None)
+ if hasattr(workflowId, 'id'):
+ workflowId = workflowId.id
+ # We have action context within executor only sometimes; include when accessible
+ reactRecord = {
+ "timestamp": timestamp,
+ "context": contextText,
+ "workflowId": workflowId,
+ "round": getattr(getattr(self, 'workflow', None), 'currentRound', None) if hasattr(self, 'workflow') else None,
+ "task": getattr(getattr(self, 'workflow', None), 'currentTask', None) if hasattr(self, 'workflow') else None,
+ "action": getattr(getattr(self, 'workflow', None), 'currentAction', None) if hasattr(self, 'workflow') else None,
+ "data": data
+ }
+ with open(reactFile, "a", encoding="utf-8") as rf:
+ rf.write(json.dumps(reactRecord, ensure_ascii=False, default=str) + "\n")
+ except Exception:
+ pass
except Exception as e:
# Don't log trace errors to avoid recursion
diff --git a/modules/workflows/processing/core/messageCreator.py b/modules/workflows/processing/core/messageCreator.py
index 6b9422d1..1bf75fb7 100644
--- a/modules/workflows/processing/core/messageCreator.py
+++ b/modules/workflows/processing/core/messageCreator.py
@@ -67,10 +67,8 @@ class MessageCreator:
"taskProgress": "pending"
}
- message = self.services.interfaceDbChat.createMessage(messageData)
- if message:
- workflow.messages.append(message)
- logger.info("Task plan message created successfully")
+ message = self.services.workflow.storeMessageWithDocuments(workflow, messageData, [])
+ logger.info("Task plan message created successfully")
except Exception as e:
logger.error(f"Error creating task plan message: {str(e)}")
@@ -103,10 +101,8 @@ class MessageCreator:
if taskStep.userMessage:
taskStartMessage["message"] += f"\n\n💬 {taskStep.userMessage}"
- message = self.services.interfaceDbChat.createMessage(taskStartMessage)
- if message:
- workflow.messages.append(message)
- logger.info(f"Task start message created for task {taskIndex}")
+ message = self.services.workflow.storeMessageWithDocuments(workflow, taskStartMessage, [])
+ logger.info(f"Task start message created for task {taskIndex}")
except Exception as e:
logger.error(f"Error creating task start message: {str(e)}")
@@ -186,14 +182,9 @@ class MessageCreator:
logger.info(f"Creating ERROR message: {messageText}")
logger.info(f"Message data: {messageData}")
- message = self.services.interfaceDbChat.createMessage(messageData)
- if message:
- workflow.messages.append(message)
- logger.info(f"Message created: {action.execMethod}.{action.execAction}")
- return message
- else:
- logger.error(f"Failed to create workflow message for action {action.execMethod}.{action.execAction}")
- return None
+ message = self.services.workflow.storeMessageWithDocuments(workflow, messageData, createdDocuments)
+ logger.info(f"Message created: {action.execMethod}.{action.execAction}")
+ return message
except Exception as e:
logger.error(f"Error creating action message: {str(e)}")
return None
@@ -236,10 +227,8 @@ class MessageCreator:
"taskProgress": "success"
}
- message = self.services.interfaceDbChat.createMessage(taskCompletionMessage)
- if message:
- workflow.messages.append(message)
- logger.info(f"Task completion message created for task {taskIndex}")
+ message = self.services.workflow.storeMessageWithDocuments(workflow, taskCompletionMessage, [])
+ logger.info(f"Task completion message created for task {taskIndex}")
except Exception as e:
logger.error(f"Error creating task completion message: {str(e)}")
@@ -265,10 +254,8 @@ class MessageCreator:
"taskProgress": "retry"
}
- message = self.services.interfaceDbChat.createMessage(retryMessage)
- if message:
- workflow.messages.append(message)
- logger.info(f"Retry message created for task {taskIndex}")
+ message = self.services.workflow.storeMessageWithDocuments(workflow, retryMessage, [])
+ logger.info(f"Retry message created for task {taskIndex}")
except Exception as e:
logger.error(f"Error creating retry message: {str(e)}")
@@ -306,10 +293,8 @@ class MessageCreator:
"taskProgress": "fail"
}
- message = self.services.interfaceDbChat.createMessage(messageData)
- if message:
- workflow.messages.append(message)
- logger.info(f"Error message created for task {taskIndex}")
+ message = self.services.workflow.storeMessageWithDocuments(workflow, messageData, [])
+ logger.info(f"Error message created for task {taskIndex}")
except Exception as e:
logger.error(f"Error creating error message: {str(e)}")
diff --git a/modules/workflows/processing/core/taskPlanner.py b/modules/workflows/processing/core/taskPlanner.py
index d9f52996..dc7c992f 100644
--- a/modules/workflows/processing/core/taskPlanner.py
+++ b/modules/workflows/processing/core/taskPlanner.py
@@ -9,6 +9,7 @@ from modules.datamodels.datamodelAi import AiCallOptions, OperationType, Process
from modules.workflows.processing.shared.promptGenerationTaskplan import (
generateTaskPlanningPrompt
)
+from modules.workflows.processing.adaptive import IntentAnalyzer
logger = logging.getLogger(__name__)
@@ -50,11 +51,16 @@ class TaskPlanner:
# Check workflow status before calling AI service
self._checkWorkflowStopped(workflow)
- # Create proper context object for task planning
+ # Analyze user intent to obtain cleaned user objective for planning
+ intentAnalyzer = IntentAnalyzer(self.services)
+ intent = await intentAnalyzer.analyzeUserIntent(actualUserPrompt, None)
+ cleanedObjective = intent.get('primaryGoal', actualUserPrompt) if isinstance(intent, dict) else actualUserPrompt
+
+ # Create proper context object for task planning using cleaned intent
# For task planning, we need to create a minimal TaskStep since TaskContext requires it
planningTaskStep = TaskStep(
id="planning",
- objective=actualUserPrompt,
+ objective=cleanedObjective,
dependencies=[],
success_criteria=[],
estimated_complexity="medium"
@@ -88,11 +94,9 @@ class TaskPlanner:
taskPlanningPromptTemplate = bundle.prompt
placeholders = bundle.placeholders
- # Log task planning prompt sent to AI
- logger.info("=== TASK PLANNING PROMPT SENT TO AI ===")
- # Trace task planning prompt
- self._writeTraceLog("Task Plan Prompt", taskPlanningPromptTemplate)
- self._writeTraceLog("Task Plan Placeholders", placeholders)
+ # Write task planning prompt to debug
+ from modules.shared.debugLogger import writeDebugFile
+ writeDebugFile(taskPlanningPromptTemplate, "taskplan_prompt", placeholders)
# Centralized AI call: Task planning (quality, detailed) with placeholders
options = AiCallOptions(
@@ -115,11 +119,8 @@ class TaskPlanner:
if not prompt:
raise ValueError("AI service returned no response for task planning")
- # Log task planning response received
- logger.info("=== TASK PLANNING AI RESPONSE RECEIVED ===")
- logger.info(f"Response length: {len(prompt) if prompt else 0}")
- # Trace task planning response
- self._writeTraceLog("Task Plan Response", prompt)
+ # Write task planning response to debug
+ writeDebugFile(prompt or '', "taskplan_response")
# Parse task plan response
try:
@@ -258,76 +259,5 @@ class TaskPlanner:
return False
def _writeTraceLog(self, contextText: str, data: Any) -> None:
- """Write trace data to configured trace file if in debug mode with improved JSON formatting"""
- try:
- import os
- import json
- from datetime import datetime, UTC
-
- # Only write if logger is in debug mode
- if logger.level > logging.DEBUG:
- return
-
- # Get log directory from configuration
- logDir = self.services.utils.configGet("APP_LOGGING_LOG_DIR", "./")
- if not os.path.isabs(logDir):
- # If relative path, make it relative to the gateway directory
- gatewayDir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
- logDir = os.path.join(gatewayDir, logDir)
-
- # Ensure log directory exists
- os.makedirs(logDir, exist_ok=True)
-
- # Create trace file path
- traceFile = os.path.join(logDir, "log_trace.log")
-
- # Format the trace entry with better structure
- timestamp = datetime.fromtimestamp(self.services.utils.getUtcTimestamp(), UTC).strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
-
- # Create a structured trace entry
- traceEntry = f"[{timestamp}] {contextText}\n"
- traceEntry += "=" * 80 + "\n"
-
- # Add data if provided with improved formatting
- if data is not None:
- try:
- if isinstance(data, (dict, list)):
- # Format as pretty JSON with better settings
- jsonStr = json.dumps(data, indent=2, default=str, ensure_ascii=False, sort_keys=False)
- traceEntry += f"JSON Data:\n{jsonStr}\n"
- elif isinstance(data, str):
- # For string data, try to parse as JSON first, then fall back to plain text
- try:
- parsed = json.loads(data)
- jsonStr = json.dumps(parsed, indent=2, default=str, ensure_ascii=False, sort_keys=False)
- traceEntry += f"JSON Data (parsed from string):\n{jsonStr}\n"
- except (json.JSONDecodeError, TypeError):
- # Not valid JSON, show as plain text with proper line breaks
- formatted_data = data.replace('\\n', '\n')
- traceEntry += f"Text Data:\n{formatted_data}\n"
- else:
- # For other types, convert to string and try to parse as JSON
- dataStr = str(data)
- try:
- parsed = json.loads(dataStr)
- jsonStr = json.dumps(parsed, indent=2, default=str, ensure_ascii=False, sort_keys=False)
- traceEntry += f"JSON Data (parsed from object):\n{jsonStr}\n"
- except (json.JSONDecodeError, TypeError):
- # Not valid JSON, show as plain text with proper line breaks
- formatted_data = dataStr.replace('\\n', '\n')
- traceEntry += f"Object Data:\n{formatted_data}\n"
- except Exception as e:
- # Fallback to simple string representation
- traceEntry += f"Data (fallback): {str(data)}\n"
- else:
- traceEntry += "No data provided\n"
-
- traceEntry += "=" * 80 + "\n\n"
-
- # Write to trace file
- with open(traceFile, "a", encoding="utf-8") as f:
- f.write(traceEntry)
-
- except Exception as e:
- # Don't log trace errors to avoid recursion
- pass
+ """Disabled extra trace file outputs (per chat debug simplification)."""
+ return
diff --git a/modules/workflows/processing/modes/modeActionplan.py b/modules/workflows/processing/modes/modeActionplan.py
index fdee8ff7..c1521a71 100644
--- a/modules/workflows/processing/modes/modeActionplan.py
+++ b/modules/workflows/processing/modes/modeActionplan.py
@@ -301,10 +301,8 @@ class ActionplanMode(BaseMode):
if action.userMessage:
actionStartMessage["message"] += f"\n\n💬 {action.userMessage}"
- message = self.services.interfaceDbChat.createMessage(actionStartMessage)
- if message:
- workflow.messages.append(message)
- logger.info(f"Action start message created for action {actionNumber}")
+ self.services.workflow.storeMessageWithDocuments(workflow, actionStartMessage, [])
+ logger.info(f"Action start message created for action {actionNumber}")
# Execute single action
result = await self.actionExecutor.executeSingleAction(action, workflow, taskStep,
diff --git a/modules/workflows/processing/modes/modeReact.py b/modules/workflows/processing/modes/modeReact.py
index 86c25172..0503e56a 100644
--- a/modules/workflows/processing/modes/modeReact.py
+++ b/modules/workflows/processing/modes/modeReact.py
@@ -20,8 +20,10 @@ from modules.workflows.processing.shared.promptGenerationActionsReact import (
generateReactParametersPrompt,
generateReactRefinementPrompt
)
+from modules.shared.debugLogger import writeDebugFile
from modules.workflows.processing.shared.placeholderFactory import extractReviewContent
from modules.workflows.processing.adaptive import IntentAnalyzer, ContentValidator, LearningEngine, ProgressTracker
+from modules.workflows.processing.adaptive.adaptiveLearningEngine import AdaptiveLearningEngine
logger = logging.getLogger(__name__)
@@ -32,8 +34,9 @@ class ReactMode(BaseMode):
super().__init__(services, workflow)
# Initialize adaptive components
self.intentAnalyzer = IntentAnalyzer(services)
- self.contentValidator = ContentValidator(services)
self.learningEngine = LearningEngine()
+ self.adaptiveLearningEngine = AdaptiveLearningEngine() # New enhanced learning engine
+ self.contentValidator = ContentValidator(services, self.adaptiveLearningEngine)
self.progressTracker = ProgressTracker()
self.currentIntent = None
# Placeholder service no longer used; prompts are generated directly
@@ -109,6 +112,20 @@ class ReactMode(BaseMode):
quality_score = 0.0
logger.info(f"Content validation: {validationResult['overallSuccess']} (quality: {quality_score:.2f})")
+ # NEW: Record validation result for adaptive learning
+ actionContext = {
+ 'actionType': selection.get('action', {}).get('action', 'unknown'),
+ 'actionName': selection.get('action', {}).get('action', 'unknown'),
+ 'workflowId': context.workflow_id
+ }
+
+ self.adaptiveLearningEngine.recordValidationResult(
+ validationResult,
+ actionContext,
+ context.workflow_id,
+ step
+ )
+
# NEW: Learn from feedback
feedback = self._collectFeedback(result, validationResult, self.workflowIntent)
self.learningEngine.learnFromFeedback(feedback, context, self.workflowIntent)
@@ -132,8 +149,7 @@ class ReactMode(BaseMode):
# Telemetry: simple duration per step
duration = time.time() - t0
- self.services.interfaceDbChat.createLog({
- "workflowId": workflow.id,
+ self.services.workflow.storeLog(workflow, {
"message": f"react_step_duration_sec={duration:.3f}",
"type": "info"
})
@@ -177,12 +193,13 @@ class ReactMode(BaseMode):
async def _planSelect(self, context: TaskContext) -> Dict[str, Any]:
"""Plan: select exactly one action. Returns {"action": {method, name}}"""
- bundle = generateReactPlanSelectionPrompt(self.services, context)
+ bundle = generateReactPlanSelectionPrompt(self.services, context, self.adaptiveLearningEngine)
promptTemplate = bundle.prompt
placeholders = bundle.placeholders
- self._writeTraceLog("React Plan Selection Prompt", promptTemplate)
- self._writeTraceLog("React Plan Selection Placeholders", placeholders)
+ # Write action selection prompt to debug
+ from modules.shared.debugLogger import writeDebugFile
+ writeDebugFile(promptTemplate, "action_selection_prompt", placeholders)
# Centralized AI call for plan selection (use plan generation quality)
options = AiCallOptions(
@@ -200,7 +217,8 @@ class ReactMode(BaseMode):
placeholders=placeholders,
options=options
)
- self._writeTraceLog("React Plan Selection Response", response)
+ # Write action selection response to debug
+ writeDebugFile(response or '', "action_selection_response")
jsonStart = response.find('{') if response else -1
jsonEnd = response.rfind('}') + 1 if response else 0
if jsonStart == -1 or jsonEnd == 0:
@@ -290,12 +308,12 @@ class ReactMode(BaseMode):
stage2Context.learnings = []
# Build and send the Stage 2 parameters prompt (always)
- bundle = generateReactParametersPrompt(self.services, stage2Context, compoundActionName)
+ bundle = generateReactParametersPrompt(self.services, stage2Context, compoundActionName, self.adaptiveLearningEngine)
promptTemplate = bundle.prompt
placeholders = bundle.placeholders
- self._writeTraceLog("React Parameters Prompt", promptTemplate)
- self._writeTraceLog("React Parameters Placeholders", placeholders)
+ # Write parameters prompt to debug
+ writeDebugFile(promptTemplate, "parameters_prompt", placeholders)
# Centralized AI call for parameter suggestion (balanced analysis)
options = AiCallOptions(
@@ -324,7 +342,9 @@ class ReactMode(BaseMode):
except Exception as e:
logger.error(f"Failed to parse AI parameters response as JSON: {str(e)}")
logger.error(f"Response was: {paramsResp}")
- parameters = {}
+ raise ValueError("AI parameters response invalid JSON")
+ if not isinstance(parameters, dict):
+ raise ValueError("AI parameters response missing 'parameters' object")
# Merge Stage 1 resource selections into Stage 2 parameters (only if action expects them)
try:
@@ -353,15 +373,12 @@ class ReactMode(BaseMode):
if 'language' not in parameters and hasattr(self.services, 'user') and getattr(self.services.user, 'language', None):
parameters['language'] = self.services.user.language
- # Write merged parameters to trace BEFORE continuing
- try:
- mergedParamObj = {
- "schema": (paramObj.get('schema') if isinstance(paramObj, dict) else 'parameters_v1'),
- "parameters": parameters
- }
- self._writeTraceLog("React Parameters Response", mergedParamObj)
- except Exception:
- pass
+ # Write parameters response to debug
+ mergedParamObj = {
+ "schema": (paramObj.get('schema') if isinstance(paramObj, dict) else 'parameters_v1'),
+ "parameters": parameters
+ }
+ writeDebugFile(str(mergedParamObj), "parameters_response", mergedParamObj)
# Build a synthetic ActionItem for execution routing and labels
currentRound = getattr(self.workflow, 'currentRound', 0)
@@ -614,8 +631,8 @@ class ReactMode(BaseMode):
promptTemplate = bundle.prompt
placeholders = bundle.placeholders
- self._writeTraceLog("React Refinement Prompt", promptTemplate)
- self._writeTraceLog("React Refinement Placeholders", placeholders)
+ # Write refinement/validation prompt to debug
+ writeDebugFile(promptTemplate, "validation_refinement_prompt", placeholders)
# Centralized AI call for refinement decision (balanced analysis)
options = AiCallOptions(
@@ -633,7 +650,8 @@ class ReactMode(BaseMode):
placeholders=placeholders,
options=options
)
- self._writeTraceLog("React Refinement Response", resp)
+ # Write refinement/validation response to debug
+ writeDebugFile(resp or '', "validation_refinement_response")
js = resp[resp.find('{'):resp.rfind('}')+1] if resp else '{}'
try:
decision = json.loads(js)
@@ -688,9 +706,7 @@ class ReactMode(BaseMode):
"actionProgress": actionProgress
}
- message = self.services.interfaceDbChat.createMessage(messageData)
- if message:
- workflow.messages.append(message)
+ self.services.workflow.storeMessageWithDocuments(workflow, messageData, [])
except Exception as e:
logger.error(f"Error creating React action message: {str(e)}")
@@ -909,7 +925,7 @@ Return only the user-friendly message, no technical details."""
return None
def _writeTraceLog(self, contextText: str, data: Any) -> None:
- """Write trace data to configured trace file if in debug mode with improved JSON formatting"""
+ """Write trace data and categorized React debug files when in DEBUG level"""
try:
import os
import json
@@ -929,8 +945,63 @@ Return only the user-friendly message, no technical details."""
# Ensure log directory exists
os.makedirs(logDir, exist_ok=True)
- # Create trace file path
+ # Create trace file path (aggregate)
traceFile = os.path.join(logDir, "log_trace.log")
+
+ # Derive a React-category filename based on context
+ def _reactFileForContext(text: str) -> str:
+ t = (text or "").lower()
+ if "task plan" in t:
+ if "prompt" in t:
+ return "react_taskplan_prompt.jsonl"
+ if "response" in t:
+ return "react_taskplan_response.jsonl"
+ if "plan selection" in t:
+ if "prompt" in t:
+ return "react_action_selection_prompt.jsonl"
+ if "response" in t:
+ return "react_action_selection_response.jsonl"
+ if "parameters" in t:
+ if "prompt" in t:
+ return "react_parameter_setting_prompt.jsonl"
+ if "response" in t:
+ return "react_parameter_setting_response.jsonl"
+ if "refinement" in t or "review" in t or "decide" in t:
+ # Treat refinement as validation/next-step decision context
+ if "prompt" in t:
+ return "react_validation_prompt.jsonl"
+ if "response" in t:
+ return "react_next_step_decisions.jsonl"
+ if "extraction" in t:
+ if "prompt" in t:
+ return "react_extraction_prompts.jsonl"
+ if "response" in t:
+ return "react_extraction_responses.jsonl"
+ if "generation" in t:
+ if "prompt" in t:
+ return "react_generation_prompts.jsonl"
+ if "response" in t:
+ return "react_generation_responses.jsonl"
+ if "render" in t:
+ if "prompt" in t:
+ return "react_rendering_prompts.jsonl"
+ if "response" in t:
+ return "react_rendering_responses.jsonl"
+ if "validation" in t:
+ if "prompt" in t:
+ return "react_validation_prompts.jsonl"
+ if "response" in t:
+ return "react_validation_responses.jsonl"
+ if "action result" in t:
+ return "react_action_results.jsonl"
+ # Fallback
+ return "react_misc.jsonl"
+
+ # Daily suffix for React files
+ dateSuffix = datetime.fromtimestamp(self.services.utils.getUtcTimestamp(), UTC).strftime("%Y%m%d")
+ baseReactFile = _reactFileForContext(contextText)
+ name, ext = os.path.splitext(baseReactFile)
+ reactFile = os.path.join(logDir, f"{name}_{dateSuffix}{ext}")
# Format the trace entry with better structure
timestamp = datetime.fromtimestamp(self.services.utils.getUtcTimestamp(), UTC).strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
@@ -978,6 +1049,28 @@ Return only the user-friendly message, no technical details."""
# Write to trace file
with open(traceFile, "a", encoding="utf-8") as f:
f.write(traceEntry)
+
+ # Also write a compact JSONL record to the categorized React file
+ try:
+ # Add workflow and step context if available
+ workflowId = getattr(getattr(self, 'workflow', None), 'id', None)
+ roundNumber = getattr(getattr(self, 'workflow', None), 'currentRound', None)
+ taskNumber = getattr(getattr(self, 'workflow', None), 'currentTask', None)
+ actionNumber = getattr(getattr(self, 'workflow', None), 'currentAction', None)
+
+ reactRecord = {
+ "timestamp": timestamp,
+ "context": contextText,
+ "workflowId": workflowId,
+ "round": roundNumber,
+ "task": taskNumber,
+ "action": actionNumber,
+ "data": data
+ }
+ with open(reactFile, "a", encoding="utf-8") as rf:
+ rf.write(json.dumps(reactRecord, ensure_ascii=False, default=str) + "\n")
+ except Exception:
+ pass
except Exception as e:
# Don't log trace errors to avoid recursion
diff --git a/modules/workflows/processing/shared/promptGenerationActionsReact.py b/modules/workflows/processing/shared/promptGenerationActionsReact.py
index 10b7f9f6..56f6aaa5 100644
--- a/modules/workflows/processing/shared/promptGenerationActionsReact.py
+++ b/modules/workflows/processing/shared/promptGenerationActionsReact.py
@@ -3,6 +3,7 @@ React Mode Prompt Generation
Handles prompt templates for react mode action handling.
"""
+import json
from typing import Any, List
from modules.datamodels.datamodelChat import PromptBundle, PromptPlaceholder
from modules.workflows.processing.shared.placeholderFactory import (
@@ -19,7 +20,7 @@ from modules.workflows.processing.shared.placeholderFactory import (
)
from modules.workflows.processing.shared.methodDiscovery import methods, getActionParameterList
-def generateReactPlanSelectionPrompt(services, context: Any) -> PromptBundle:
+def generateReactPlanSelectionPrompt(services, context: Any, learningEngine=None) -> PromptBundle:
"""Define placeholders first, then the template; return PromptBundle."""
placeholders: List[PromptPlaceholder] = [
PromptPlaceholder(label="USER_PROMPT", content=extractUserPrompt(context), summaryAllowed=False),
@@ -31,6 +32,21 @@ def generateReactPlanSelectionPrompt(services, context: Any) -> PromptBundle:
PromptPlaceholder(label="AVAILABLE_DOCUMENTS_INDEX", content=extractAvailableDocumentsIndex(services, context), summaryAllowed=True),
PromptPlaceholder(label="AVAILABLE_CONNECTIONS_INDEX", content=extractAvailableConnectionsIndex(services), summaryAllowed=False),
]
+
+ # Add adaptive learning context if available
+ adaptiveContext = {}
+ if learningEngine:
+ workflowId = getattr(context, 'workflow_id', 'unknown')
+ userPrompt = extractUserPrompt(context)
+ adaptiveContext = learningEngine.getAdaptiveContextForActionSelection(workflowId, userPrompt)
+
+ if adaptiveContext:
+ # Add learning-aware placeholders
+ placeholders.extend([
+ PromptPlaceholder(label="ADAPTIVE_GUIDANCE", content=adaptiveContext.get('adaptiveGuidance', ''), summaryAllowed=True),
+ PromptPlaceholder(label="FAILURE_ANALYSIS", content=json.dumps(adaptiveContext.get('failureAnalysis', {}), indent=2), summaryAllowed=True),
+ PromptPlaceholder(label="ESCALATION_LEVEL", content=adaptiveContext.get('escalationLevel', 'low'), summaryAllowed=False),
+ ])
template = """Select exactly one next action to advance the task incrementally.
@@ -52,11 +68,26 @@ AVAILABLE_DOCUMENTS_INDEX:
AVAILABLE_CONNECTIONS_INDEX:
{{KEY:AVAILABLE_CONNECTIONS_INDEX}}
+{{#if ADAPTIVE_GUIDANCE}}
+LEARNING-BASED GUIDANCE:
+{{KEY:ADAPTIVE_GUIDANCE}}
+
+{{#if FAILURE_ANALYSIS}}
+FAILURE ANALYSIS:
+{{KEY:FAILURE_ANALYSIS}}
+{{/if}}
+
+ESCALATION LEVEL: {{KEY:ESCALATION_LEVEL}}
+{{/if}}
+
REPLY: Return ONLY a JSON object with the following structure (no comments, no extra text). The chosen action MUST:
- be the next logical incremental step toward fulfilling the objective
- not attempt to complete the entire objective in one step
- if producing files, target exactly one output format for this step
- reference ONLY existing document IDs/labels from AVAILABLE_DOCUMENTS_INDEX
+{{#if ADAPTIVE_GUIDANCE}}
+- learn from previous validation feedback and avoid repeated mistakes
+{{/if}}
{{
"action": "method.action_name",
"actionObjective": "...",
@@ -81,11 +112,15 @@ RULES:
- Copy references EXACTLY as shown in AVAILABLE_DOCUMENTS_INDEX
6. For requiredConnection, use ONLY an exact label from AVAILABLE_CONNECTIONS_INDEX
7. Plan incrementally: if the overall intent needs multiple output formats (e.g., CSV and HTML), choose one format in this step and leave the other(s) for subsequent steps
+{{#if ADAPTIVE_GUIDANCE}}
+8. CRITICAL: Learn from previous validation feedback - avoid repeating the same mistakes
+9. If previous attempts failed, consider alternative approaches or more specific parameters
+{{/if}}
"""
return PromptBundle(prompt=template, placeholders=placeholders)
-def generateReactParametersPrompt(services, context: Any, compoundActionName: str) -> PromptBundle:
+def generateReactParametersPrompt(services, context: Any, compoundActionName: str, learningEngine=None) -> PromptBundle:
"""Define placeholders first, then the template; return PromptBundle.
Minimal Stage 2 (no fallback): consumes actionObjective, selectedAction, parametersContext only.
@@ -166,6 +201,19 @@ Excludes documents/connections/history entirely.
PromptPlaceholder(label="ACTION_PARAMETERS", content=actionParametersText, summaryAllowed=False),
PromptPlaceholder(label="LEARNINGS", content=learningsText, summaryAllowed=True),
]
+
+ # Add adaptive learning context if available
+ adaptiveContext = {}
+ if learningEngine:
+ workflowId = getattr(context, 'workflow_id', 'unknown')
+ adaptiveContext = learningEngine.getAdaptiveContextForParameters(workflowId, compoundActionName, parametersContext or "")
+
+ if adaptiveContext:
+ placeholders.extend([
+ PromptPlaceholder(label="PARAMETER_GUIDANCE", content=adaptiveContext.get('parameterGuidance', ''), summaryAllowed=True),
+ PromptPlaceholder(label="ATTEMPT_NUMBER", content=str(adaptiveContext.get('attemptNumber', 1)), summaryAllowed=False),
+ PromptPlaceholder(label="FAILURE_ANALYSIS", content=json.dumps(adaptiveContext.get('failureAnalysis', {}), indent=2), summaryAllowed=True),
+ ])
template = """You are a parameter generator. Set the parameters for this specific action.
@@ -177,6 +225,19 @@ CONTEXT AND OBJECTIVE:
SELECTED_ACTION:
{{KEY:SELECTED_ACTION}}
+{{#if PARAMETER_GUIDANCE}}
+LEARNING-BASED PARAMETER GUIDANCE:
+{{KEY:PARAMETER_GUIDANCE}}
+
+{{#if ATTEMPT_NUMBER}}
+ATTEMPT NUMBER: {{KEY:ATTEMPT_NUMBER}}
+{{/if}}
+
+{{#if FAILURE_ANALYSIS}}
+PREVIOUS FAILURE ANALYSIS:
+{{KEY:FAILURE_ANALYSIS}}
+{{/if}}
+{{/if}}
REPLY (ONLY JSON):
{{
@@ -203,12 +264,19 @@ INSTRUCTIONS:
- Fill in appropriate values based on the context and objective
- Do NOT invent new parameters
- Do NOT include: documentList, connectionReference, history, documents, connections
+{{#if PARAMETER_GUIDANCE}}
+- CRITICAL: Follow the learning-based parameter guidance above
+- Learn from previous validation failures and adjust parameters accordingly
+{{/if}}
RULES:
- Return ONLY JSON (no markdown, no prose)
- Use ONLY the exact parameter names listed in REQUIRED PARAMETERS FOR THIS ACTION
- Do NOT add any parameters not listed above
- Do NOT add nested objects or custom fields
+{{#if PARAMETER_GUIDANCE}}
+- Apply learning insights to avoid repeated parameter mistakes
+{{/if}}
"""
return PromptBundle(prompt=template, placeholders=placeholders)
@@ -220,26 +288,23 @@ def generateReactRefinementPrompt(services, context: Any, reviewContent: str) ->
PromptPlaceholder(label="REVIEW_CONTENT", content=reviewContent, summaryAllowed=True),
]
- template = """Decide the next step based on the observation.
+ template = """TASK DECISION
-OBJECTIVE:
-{{KEY:USER_PROMPT}}
+OBJECTIVE: '{{KEY:USER_PROMPT}}'
-OBSERVATION:
-{{KEY:REVIEW_CONTENT}}
+DECISION RULES:
+1. "continue" = objective NOT fulfilled
+2. "stop" = objective fulfilled
+3. Return ONLY JSON - no other text
-REPLY: Return only a JSON object with your decision:
+OUTPUT FORMAT (only JSON object to deliver):
{{
-"decision": "continue|stop",
-"reason": "brief explanation"
+ "decision": "continue",
+ "reason": "Brief reason for decision"
}}
-RULES:
-1. Use "continue" if objective NOT fulfilled
-2. Use "stop" if objective fulfilled
-3. Return ONLY JSON - no other text
-4. Do NOT use markdown code blocks
-5. Do NOT add explanations
+OBSERVATION: {{KEY:REVIEW_CONTENT}}
+
"""
return PromptBundle(prompt=template, placeholders=placeholders)
diff --git a/modules/workflows/workflowManager.py b/modules/workflows/workflowManager.py
index 7b52ffe9..5c04ba0b 100644
--- a/modules/workflows/workflowManager.py
+++ b/modules/workflows/workflowManager.py
@@ -3,6 +3,7 @@ import logging
from datetime import datetime, UTC
import uuid
import asyncio
+import json
from modules.datamodels.datamodelChat import (
UserInputRequest,
@@ -38,7 +39,7 @@ class WorkflowManager:
if not workflow:
raise ValueError(f"Workflow {workflowId} not found")
- # Store workflow in services for reference (don't overwrite the workflow service)
+ # Store workflow in services for reference
self.services.currentWorkflow = workflow
if workflow.status == "running":
@@ -49,14 +50,12 @@ class WorkflowManager:
"status": "stopped",
"lastActivity": currentTime
})
- self.services.workflow.createLog({
- "workflowId": workflowId,
- "message": "Workflow stopped for new prompt",
- "type": "info",
- "status": "stopped",
- "progress": 100
- })
- await asyncio.sleep(0.1)
+ self.services.workflow.storeLog(workflow, {
+ "message": "Workflow stopped for new prompt",
+ "type": "info",
+ "status": "stopped",
+ "progress": 100
+ })
newRound = workflow.currentRound + 1
self.services.workflow.updateWorkflow(workflowId, {
@@ -66,27 +65,26 @@ class WorkflowManager:
"workflowMode": workflowMode # Update workflow mode for existing workflows
})
- workflow = self.services.workflow.getWorkflow(workflowId)
- if not workflow:
- raise ValueError(f"Failed to reload workflow {workflowId} after update")
+ # Reflect updates on the in-memory object without reloading
+ workflow.status = "running"
+ workflow.lastActivity = currentTime
+ workflow.currentRound = newRound
+ workflow.workflowMode = workflowMode
- self.services.workflow.createLog({
- "workflowId": workflowId,
+ self.services.workflow.storeLog(workflow, {
"message": f"Workflow resumed (round {workflow.currentRound}) with mode: {workflowMode}",
"type": "info",
"status": "running",
"progress": 0
})
- # CRITICAL: Update the workflow object's workflowMode attribute for immediate use
- workflow.workflowMode = workflowMode
else:
workflowData = {
"name": "New Workflow",
"status": "running",
"startedAt": currentTime,
"lastActivity": currentTime,
- "currentRound": 0,
+ "currentRound": 1,
"currentTask": 0,
"currentAction": 0,
"totalTasks": 0,
@@ -108,11 +106,8 @@ class WorkflowManager:
workflow = self.services.workflow.createWorkflow(workflowData)
logger.info(f"Created workflow with mode: {getattr(workflow, 'workflowMode', 'NOT_SET')}")
logger.info(f"Workflow data passed: {workflowData.get('workflowMode', 'NOT_IN_DATA')}")
- workflow.currentRound = 1
- self.services.workflow.updateWorkflow(workflow.id, {"currentRound": 1})
self.services.workflow.updateWorkflowStats(workflow.id, bytesSent=0, bytesReceived=0)
- # Store workflow in services for reference (don't overwrite the workflow service)
self.services.currentWorkflow = workflow
# Start workflow processing asynchronously
@@ -136,8 +131,7 @@ class WorkflowManager:
"status": "stopped",
"lastActivity": workflow.lastActivity
})
- self.services.workflow.createLog({
- "workflowId": workflowId,
+ self.services.workflow.storeLog(workflow, {
"message": "Workflow stopped",
"type": "warning",
"status": "stopped",
@@ -199,159 +193,141 @@ class WorkflowManager:
"taskProgress": "pending",
"actionProgress": "pending"
}
+
+ # Clear trace log for new workflow session
+ self.workflowProcessor.clearTraceLog()
- # Create message first to get messageId
- message = self.services.workflow.createMessage(messageData)
- if message:
- workflow.messages.append(message)
-
- # Clear trace log for new workflow session
- self.workflowProcessor.clearTraceLog()
-
- # Add documents if any, now with messageId
- if userInput.listFileId:
- # Process file IDs and add to message data
- documents = await self._processFileIds(userInput.listFileId, message.id)
- message.documents = documents
- # Update the message with documents in database
- self.services.workflow.updateMessage(message.id, {"documents": [doc.to_dict() for doc in documents]})
+ # Analyze the user's input to detect language, normalize request, extract intent, and offload bulky context into documents
+ created_docs = []
- # Analyze the user's input to detect language, normalize request, extract intent, and offload bulky context into documents
+ try:
+ analyzerPrompt = (
+ "You are an input analyzer. From the user's message, perform ALL of the following in one pass:\n"
+ "1) detectedLanguage: detect ISO 639-1 language code (e.g., de, en).\n"
+ "2) normalizedRequest: full, explicit restatement of the user's request in the detected language; do NOT summarize; preserve ALL constraints and details.\n"
+ "3) intent: concise single-paragraph core request in the detected language for high-level routing.\n"
+ "4) contextItems: supportive data blocks to attach as separate documents if significantly larger than the intent (large literal content, long lists/tables, code/JSON blocks, transcripts, CSV fragments, detailed specs). Keep URLs in the intent unless they embed large pasted content.\n\n"
+ "Rules:\n"
+ "- If total content (intent + data) is < 10% of model max tokens, do not extract; return empty contextItems and keep intent compact and self-contained.\n"
+ "- If content exceeds that threshold, move bulky parts into contextItems; keep intent short and clear.\n"
+ "- Preserve critical references (URLs, filenames) in intent.\n"
+ "- Normalize to the primary detected language if mixed-language.\n\n"
+ "Return ONLY JSON (no markdown) with this shape:\n"
+ "{\n"
+ " \"detectedLanguage\": \"de|en|fr|it|...\",\n"
+ " \"normalizedRequest\": \"Full explicit instruction in detected language\",\n"
+ " \"intent\": \"Concise normalized request...\",\n"
+ " \"contextItems\": [\n"
+ " {\n"
+ " \"title\": \"User context 1\",\n"
+ " \"mimeType\": \"text/plain\",\n"
+ " \"content\": \"Full extracted content block here\"\n"
+ " }\n"
+ " ]\n"
+ "}\n\n"
+ f"User message:\n{userInput.prompt}"
+ )
+
+ # Call AI analyzer
+ aiResponse = await self.services.ai.callAi(prompt=analyzerPrompt)
+
+ detectedLanguage = None
+ normalizedRequest = None
+ intentText = userInput.prompt
+ contextItems = []
+
+ # Parse analyzer response (JSON expected)
try:
- analyzerPrompt = (
- "You are an input analyzer. From the user's message, perform ALL of the following in one pass:\n"
- "1) detectedLanguage: detect ISO 639-1 language code (e.g., de, en).\n"
- "2) normalizedRequest: full, explicit restatement of the user's request in the detected language; do NOT summarize; preserve ALL constraints and details.\n"
- "3) intent: concise single-paragraph core request in the detected language for high-level routing.\n"
- "4) contextItems: supportive data blocks to attach as separate documents if significantly larger than the intent (large literal content, long lists/tables, code/JSON blocks, transcripts, CSV fragments, detailed specs). Keep URLs in the intent unless they embed large pasted content.\n\n"
- "Rules:\n"
- "- If total content (intent + data) is < 10% of model max tokens, do not extract; return empty contextItems and keep intent compact and self-contained.\n"
- "- If content exceeds that threshold, move bulky parts into contextItems; keep intent short and clear.\n"
- "- Preserve critical references (URLs, filenames) in intent.\n"
- "- Normalize to the primary detected language if mixed-language.\n\n"
- "Return ONLY JSON (no markdown) with this shape:\n"
- "{\n"
- " \"detectedLanguage\": \"de|en|fr|it|...\",\n"
- " \"normalizedRequest\": \"Full explicit instruction in detected language\",\n"
- " \"intent\": \"Concise normalized request...\",\n"
- " \"contextItems\": [\n"
- " {\n"
- " \"title\": \"User context 1\",\n"
- " \"mimeType\": \"text/plain\",\n"
- " \"content\": \"Full extracted content block here\"\n"
- " }\n"
- " ]\n"
- "}\n\n"
- f"User message:\n{userInput.prompt}"
- )
-
- # Call AI analyzer
- aiResponse = await self.services.ai.callAi(prompt=analyzerPrompt)
-
- detectedLanguage = None
- normalizedRequest = None
- intentText = userInput.prompt
+ jsonStart = aiResponse.find('{') if aiResponse else -1
+ jsonEnd = aiResponse.rfind('}') + 1 if aiResponse else 0
+ if jsonStart != -1 and jsonEnd > jsonStart:
+ parsed = json.loads(aiResponse[jsonStart:jsonEnd])
+ detectedLanguage = parsed.get('detectedLanguage') or None
+ normalizedRequest = parsed.get('normalizedRequest') or None
+ if parsed.get('intent'):
+ intentText = parsed.get('intent')
+ contextItems = parsed.get('contextItems') or []
+ except Exception:
contextItems = []
- # Parse analyzer response (JSON expected)
+ # Update services state
+ if detectedLanguage and isinstance(detectedLanguage, str):
+ self._setUserLanguage(detectedLanguage)
try:
- import json
- jsonStart = aiResponse.find('{') if aiResponse else -1
- jsonEnd = aiResponse.rfind('}') + 1 if aiResponse else 0
- if jsonStart != -1 and jsonEnd > jsonStart:
- parsed = json.loads(aiResponse[jsonStart:jsonEnd])
- detectedLanguage = parsed.get('detectedLanguage') or None
- normalizedRequest = parsed.get('normalizedRequest') or None
- if parsed.get('intent'):
- intentText = parsed.get('intent')
- contextItems = parsed.get('contextItems') or []
+ setattr(self.services, 'currentUserLanguage', detectedLanguage)
except Exception:
- contextItems = []
+ pass
+ self.services.currentUserPrompt = intentText or userInput.prompt
+ try:
+ if normalizedRequest:
+ setattr(self.services, 'currentUserPromptNormalized', normalizedRequest)
+ if contextItems is not None:
+ setattr(self.services, 'currentUserContextItems', contextItems)
+ except Exception:
+ pass
- # Update services state
- if detectedLanguage and isinstance(detectedLanguage, str):
- self._setUserLanguage(detectedLanguage)
+ # Telemetry (sizes and counts)
+ try:
+ inputSize = len(userInput.prompt.encode('utf-8')) if userInput and userInput.prompt else 0
+ outputSize = len(aiResponse.encode('utf-8')) if aiResponse else 0
+ self.services.workflow.storeLog(workflow, {
+ "message": f"User prompt analyzed (input {inputSize} bytes, output {outputSize} bytes, items {len(contextItems)})",
+ "type": "info",
+ "status": "running",
+ "progress": 0
+ })
+ except Exception:
+ pass
+
+ # Create documents for context items (in-memory ChatDocument; persistence via storeMessageWithDocuments)
+ if contextItems and isinstance(contextItems, list):
+ for idx, item in enumerate(contextItems):
try:
- setattr(self.services, 'currentUserLanguage', detectedLanguage)
- except Exception:
- pass
- self.services.currentUserPrompt = intentText or userInput.prompt
- try:
- if normalizedRequest:
- setattr(self.services, 'currentUserPromptNormalized', normalizedRequest)
- if contextItems is not None:
- setattr(self.services, 'currentUserContextItems', contextItems)
- except Exception:
- pass
-
- # Telemetry (sizes and counts)
- try:
- inputSize = len(userInput.prompt.encode('utf-8')) if userInput and userInput.prompt else 0
- outputSize = len(aiResponse.encode('utf-8')) if aiResponse else 0
- self.services.workflow.createLog({
- "workflowId": workflow.id,
- "message": f"User prompt analyzed (input {inputSize} bytes, output {outputSize} bytes, items {len(contextItems)})",
- "type": "info",
- "status": "running",
- "progress": 0
- })
- except Exception:
- pass
-
- # Create and attach documents for context items
- if contextItems and isinstance(contextItems, list):
- created_docs = []
- for idx, item in enumerate(contextItems):
- try:
- title = item.get('title') if isinstance(item, dict) else None
- mime = item.get('mimeType') if isinstance(item, dict) else None
- content = item.get('content') if isinstance(item, dict) else None
- if not content:
- continue
- fileName = (title or f"user_context_{idx+1}.txt").strip()
- mimeType = (mime or "text/plain").strip()
-
- # Create file in component storage
- content_bytes = content.encode('utf-8')
- file_item = self.services.interfaceDbComponent.createFile(
- name=fileName,
- mimeType=mimeType,
- content=content_bytes
- )
- # Persist file data
- self.services.interfaceDbComponent.createFileData(file_item.id, content_bytes)
-
- # Collect file info
- file_info = self.services.workflow.getFileInfo(file_item.id)
- from modules.datamodels.datamodelChat import ChatDocument as _ChatDocument
- doc = _ChatDocument(
- messageId=message.id,
- fileId=file_item.id,
- fileName=file_info.get("fileName", fileName) if file_info else fileName,
- fileSize=file_info.get("size", len(content_bytes)) if file_info else len(content_bytes),
- mimeType=file_info.get("mimeType", mimeType) if file_info else mimeType
- )
- # Persist document record
- self.services.interfaceDbChat.createDocument(doc.to_dict())
- created_docs.append(doc)
- except Exception:
+ title = item.get('title') if isinstance(item, dict) else None
+ mime = item.get('mimeType') if isinstance(item, dict) else None
+ content = item.get('content') if isinstance(item, dict) else None
+ if not content:
continue
+ fileName = (title or f"user_context_{idx+1}.txt").strip()
+ mimeType = (mime or "text/plain").strip()
- if created_docs:
- # Attach to message and persist
- if not message.documents:
- message.documents = []
- message.documents.extend(created_docs)
- self.services.workflow.updateMessage(message.id, {
- "documents": [d.to_dict() for d in message.documents],
- "documentsLabel": context_label
- })
+ # Create file in component storage
+ content_bytes = content.encode('utf-8')
+ file_item = self.services.interfaceDbComponent.createFile(
+ name=fileName,
+ mimeType=mimeType,
+ content=content_bytes
+ )
+ # Persist file data
+ self.services.interfaceDbComponent.createFileData(file_item.id, content_bytes)
+
+ # Collect file info
+ file_info = self.services.workflow.getFileInfo(file_item.id)
+ from modules.datamodels.datamodelChat import ChatDocument
+ doc = ChatDocument(
+ fileId=file_item.id,
+ fileName=file_info.get("fileName", fileName) if file_info else fileName,
+ fileSize=file_info.get("size", len(content_bytes)) if file_info else len(content_bytes),
+ mimeType=file_info.get("mimeType", mimeType) if file_info else mimeType
+ )
+ created_docs.append(doc)
+ except Exception:
+ continue
+ except Exception as e:
+ logger.warning(f"Prompt analysis failed or skipped: {str(e)}")
+
+ # Process user-uploaded documents (fileIds) and combine with context documents
+ if userInput.listFileId:
+ try:
+ user_docs = await self._processFileIds(userInput.listFileId, None)
+ if user_docs:
+ created_docs.extend(user_docs)
except Exception as e:
- logger.warning(f"Prompt analysis failed or skipped: {str(e)}")
-
- return message
- else:
- raise Exception("Failed to create first message")
+ logger.warning(f"Failed to process user fileIds: {e}")
+
+ # Finally, persist and bind the first message with combined documents (context + user)
+ created_message = self.services.workflow.storeMessageWithDocuments(workflow, messageData, created_docs)
+ return created_message
except Exception as e:
logger.error(f"Error sending first message: {str(e)}")
@@ -444,9 +420,7 @@ class WorkflowManager:
"taskProgress": "stopped",
"actionProgress": "stopped"
}
- message = self.services.workflow.createMessage(stopped_message)
- if message:
- workflow.messages.append(message)
+ self.services.workflow.storeMessageWithDocuments(workflow, stopped_message, [])
# Update workflow status to stopped
workflow.status = "stopped"
@@ -476,9 +450,7 @@ class WorkflowManager:
"taskProgress": "stopped",
"actionProgress": "stopped"
}
- message = self.services.workflow.createMessage(stopped_message)
- if message:
- workflow.messages.append(message)
+ self.services.workflow.storeMessageWithDocuments(workflow, stopped_message, [])
# Update workflow status to stopped
workflow.status = "stopped"
@@ -491,8 +463,7 @@ class WorkflowManager:
})
# Add stopped log entry
- self.services.workflow.createLog({
- "workflowId": workflow.id,
+ self.services.workflow.storeLog(workflow, {
"message": "Workflow stopped by user",
"type": "warning",
"status": "stopped",
@@ -518,9 +489,7 @@ class WorkflowManager:
"taskProgress": "fail",
"actionProgress": "fail"
}
- message = self.services.workflow.createMessage(error_message)
- if message:
- workflow.messages.append(message)
+ self.services.workflow.storeMessageWithDocuments(workflow, error_message, [])
# Update workflow status to failed
workflow.status = "failed"
@@ -533,8 +502,7 @@ class WorkflowManager:
})
# Add failed log entry
- self.services.workflow.createLog({
- "workflowId": workflow.id,
+ self.services.workflow.storeLog(workflow, {
"message": "Workflow failed: Unknown error",
"type": "error",
"status": "failed",
@@ -565,9 +533,7 @@ class WorkflowManager:
"taskProgress": "fail",
"actionProgress": "fail"
}
- message = self.services.workflow.createMessage(error_message)
- if message:
- workflow.messages.append(message)
+ self.services.workflow.storeMessageWithDocuments(workflow, error_message, [])
# Update workflow status to failed
workflow.status = "failed"
@@ -610,9 +576,9 @@ class WorkflowManager:
}
# Create message using interface
- message = self.services.workflow.createMessage(messageData)
+ message = self.services.workflow.storeMessageWithDocuments(workflow, messageData, [])
if message:
- workflow.messages.append(message)
+ self.services.workflow.storeMessageWithDocuments(workflow, message.__dict__, getattr(message, 'documents', []))
# Update workflow status to completed
workflow.status = "completed"
@@ -625,8 +591,7 @@ class WorkflowManager:
})
# Add completion log entry
- self.services.workflow.createLog({
- "workflowId": workflow.id,
+ self.services.workflow.storeLog(workflow, {
"message": "Workflow completed",
"type": "success",
"status": "completed",
@@ -696,13 +661,10 @@ class WorkflowManager:
"taskProgress": "pending",
"actionProgress": "pending"
}
- message = self.services.workflow.createMessage(stopped_message)
- if message:
- workflow.messages.append(message)
+ self.services.workflow.storeMessageWithDocuments(workflow, stopped_message, [])
# Add log entry
- self.services.workflow.createLog({
- "workflowId": workflow.id,
+ self.services.workflow.storeLog(workflow, {
"message": "Workflow stopped by user",
"type": "warning",
"status": "stopped",
@@ -741,13 +703,10 @@ class WorkflowManager:
"taskProgress": "fail",
"actionProgress": "fail"
}
- message = self.services.workflow.createMessage(error_message)
- if message:
- workflow.messages.append(message)
+ self.services.workflow.storeMessageWithDocuments(workflow, error_message, [])
# Add error log entry
- self.services.workflow.createLog({
- "workflowId": workflow.id,
+ self.services.workflow.storeLog(workflow, {
"message": f"Workflow failed: {str(error)}",
"type": "error",
"status": "failed",