diff --git a/modules/interfaces/interfaceDbChatObjects.py b/modules/interfaces/interfaceDbChatObjects.py index 3e5932f3..91d9b4a7 100644 --- a/modules/interfaces/interfaceDbChatObjects.py +++ b/modules/interfaces/interfaceDbChatObjects.py @@ -318,49 +318,9 @@ class ChatObjects: # Update main workflow in database updated = self.db.recordModify(ChatWorkflow, workflowId, simple_fields) - - - # Handle object field updates (inline to avoid helper dependency) - if 'logs' in object_fields: - logs_data = object_fields['logs'] - try: - for log_data in logs_data: - if hasattr(log_data, 'model_dump'): - log_dict = log_data.model_dump() # Pydantic v2 - elif hasattr(log_data, 'dict'): - log_dict = log_data.dict() # Pydantic v1 - elif hasattr(log_data, 'to_dict'): - log_dict = log_data.to_dict() - else: - log_dict = log_data - log_dict["workflowId"] = workflowId - self.createLog(log_dict) - except Exception as e: - logger.error(f"Error updating workflow logs: {str(e)}") - if 'messages' in object_fields: - messages_data = object_fields['messages'] - try: - for message_data in messages_data: - if hasattr(message_data, 'model_dump'): - msg_dict = message_data.model_dump() # Pydantic v2 - elif hasattr(message_data, 'dict'): - msg_dict = message_data.dict() # Pydantic v1 - elif hasattr(message_data, 'to_dict'): - msg_dict = message_data.to_dict() - else: - msg_dict = message_data - msg_dict["workflowId"] = workflowId - self.updateMessage(msg_dict.get("id"), msg_dict) - except Exception as e: - logger.error(f"Error updating workflow messages: {str(e)}") - if 'stats' in object_fields: - stats_data = object_fields['stats'] - try: - if stats_data: - stats_data["workflowId"] = workflowId - self.db.recordCreate(ChatStat, stats_data) - except Exception as e: - logger.error(f"Error updating workflow stats: {str(e)}") + + # Removed cascade writes for logs/messages/stats during workflow update. + # CUD for child entities must be executed via dedicated service methods. # Load fresh data from normalized tables logs = self.getLogs(workflowId) diff --git a/modules/services/serviceAi/subCoreAi.py b/modules/services/serviceAi/subCoreAi.py index 3f245334..5d7a94ac 100644 --- a/modules/services/serviceAi/subCoreAi.py +++ b/modules/services/serviceAi/subCoreAi.py @@ -75,13 +75,7 @@ class SubCoreAi: else: full_prompt = prompt - self._writeAiResponseDebug( - label='ai_prompt_debug', - content=full_prompt, - partIndex=1, - modelName=None, - continuation=False - ) + # Timestamp-only prompt debug writing removed except Exception: pass @@ -473,38 +467,9 @@ class SubCoreAi: return full_prompt - def _writeAiResponseDebug(self, label: str, content: str, partIndex: int = 1, modelName: str = None, continuation: bool = None) -> None: - """Persist raw AI response parts for debugging under test-chat/ai - only if debug enabled.""" - try: - # Check if debug logging is enabled - debug_enabled = self.services.utils.configGet("APP_DEBUG_CHAT_WORKFLOW_ENABLED", False) - if not debug_enabled: - return - - import os - from datetime import datetime, UTC - # Base dir: gateway/test-chat/ai (go up 4 levels from this file) - # .../gateway/modules/services/serviceAi/subCoreAi.py -> up to gateway root - gatewayDir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) - outDir = os.path.join(gatewayDir, 'test-chat', 'ai') - os.makedirs(outDir, exist_ok=True) - ts = datetime.now(UTC).strftime('%Y%m%d-%H%M%S-%f')[:-3] - suffix = [] - if partIndex is not None: - suffix.append(f"part{partIndex}") - if continuation is not None: - suffix.append(f"cont_{str(continuation).lower()}") - if modelName: - safeModel = ''.join(c if c.isalnum() or c in ('-', '_') else '-' for c in modelName) - suffix.append(safeModel) - suffixStr = ('_' + '_'.join(suffix)) if suffix else '' - fname = f"{ts}_{label}{suffixStr}.txt" - fpath = os.path.join(outDir, fname) - with open(fpath, 'w', encoding='utf-8') as f: - f.write(content or '') - except Exception: - # Do not raise; best-effort debug write - pass + def _writeAiResponseDebug(self, label: str, content: Any, partIndex: int = 1, modelName: str = None, continuation: bool = None) -> None: + """Disabled verbose debug writing; only minimal files elsewhere.""" + return def _exceedsTokenLimit(self, text: str, model: ModelCapabilities, safety_margin: float) -> bool: """ diff --git a/modules/services/serviceAi/subDocumentGeneration.py b/modules/services/serviceAi/subDocumentGeneration.py index 6d7ee4b7..504ba154 100644 --- a/modules/services/serviceAi/subDocumentGeneration.py +++ b/modules/services/serviceAi/subDocumentGeneration.py @@ -157,6 +157,20 @@ class SubDocumentGeneration: # Call AI to enhance the content response = await self.aiObjects.call(request) + # Save generation prompt and response to debug + try: + from modules.shared.debugLogger import writeDebugFile + debugData = { + "output_format": outputFormat, + "title": title, + "context_length": len(context), + "extracted_content_keys": list(aiResponseJson.keys()) if isinstance(aiResponseJson, dict) else [] + } + writeDebugFile(generationPrompt, "generation_single", debugData) + writeDebugFile(response.content or '', "generation_single_response") + except Exception: + pass + if response and response.content: # Parse the AI response as JSON try: @@ -360,6 +374,21 @@ class SubDocumentGeneration: # Call AI to enhance the content response = await self.aiObjects.call(request) + # Save generation prompt and response to debug + try: + from modules.shared.debugLogger import writeDebugFile + debugData = { + "output_format": outputFormat, + "title": doc_data["title"], + "document_index": i, + "context_length": len(context), + "extracted_content_keys": list(complete_document.keys()) if isinstance(complete_document, dict) else [] + } + writeDebugFile(generationPrompt, f"generation_multi_doc_{i}", debugData) + writeDebugFile(response.content or '', f"generation_multi_doc_{i}_response") + except Exception: + pass + if response and response.content: # Parse the AI response as JSON try: @@ -659,7 +688,7 @@ Return only the JSON response. "documentsLabel": label, "documents": [] } - message = services.workflow.createMessage(messageData) + message = services.workflow.storeMessageWithDocuments(services.workflow.workflow, messageData, []) if not message: return diff --git a/modules/services/serviceAi/subDocumentProcessing.py b/modules/services/serviceAi/subDocumentProcessing.py index d85a5341..44dcc41c 100644 --- a/modules/services/serviceAi/subDocumentProcessing.py +++ b/modules/services/serviceAi/subDocumentProcessing.py @@ -104,17 +104,10 @@ class SubDocumentProcessing: # FIXED: Merge with preserved chunk relationships mergedContent = self._mergeChunkResults(chunkResults, options) - # Save merged extraction content to debug file - only if debug enabled + # Save merged extraction content to debug try: - debug_enabled = self.services.utils.configGet("APP_DEBUG_CHAT_WORKFLOW_ENABLED", False) - if debug_enabled: - import os - from datetime import datetime, UTC - ts = datetime.now(UTC).strftime("%Y%m%d-%H%M%S") - debug_root = "./test-chat/ai" - os.makedirs(debug_root, exist_ok=True) - with open(os.path.join(debug_root, f"{ts}_extraction_merged.txt"), "w", encoding="utf-8") as f: - f.write(mergedContent or "") + from modules.shared.debugLogger import writeDebugFile + writeDebugFile(mergedContent or '', "extraction_merged") except Exception: pass @@ -204,18 +197,12 @@ class SubDocumentProcessing: logger.debug(f"Normalization error type: {type(e).__name__}") # Continue with original merged JSON instead of re-raising - # Save merged JSON extraction content to debug file - only if debug enabled + # Save merged JSON extraction content to debug try: - debug_enabled = self.services.utils.configGet("APP_DEBUG_CHAT_WORKFLOW_ENABLED", False) - if debug_enabled: - import os - import json as _json - from datetime import datetime, UTC - ts = datetime.now(UTC).strftime("%Y%m%d-%H%M%S") - debug_root = "./test-chat/ai" - os.makedirs(debug_root, exist_ok=True) - with open(os.path.join(debug_root, f"{ts}_extraction_merged.json"), "w", encoding="utf-8") as f: - f.write(_json.dumps(mergedJsonDocument, ensure_ascii=False, indent=2)) + from modules.shared.debugLogger import writeDebugFile + import json as _json + jsonStr = _json.dumps(mergedJsonDocument, ensure_ascii=False, indent=2) + writeDebugFile(jsonStr, "extraction_merged_json", mergedJsonDocument) except Exception: pass @@ -532,21 +519,19 @@ class SubDocumentProcessing: # Log extraction response self.services.utils.debugLogToFile(f"EXTRACTION RESPONSE LENGTH: {len(ai_result) if ai_result else 0} characters", "AI_SERVICE") - # Save full extraction prompt and response to debug file - only if debug enabled - debug_enabled = self.services.utils.configGet("APP_DEBUG_CHAT_WORKFLOW_ENABLED", False) - if debug_enabled: - try: - import os - from datetime import datetime, UTC - ts = datetime.now(UTC).strftime("%Y%m%d-%H%M%S") - debug_root = "./test-chat/ai" - os.makedirs(debug_root, exist_ok=True) - with open(os.path.join(debug_root, f"{ts}_extraction_container_chunk_{chunk_index}.txt"), "w", encoding="utf-8") as f: - f.write(f"EXTRACTION PROMPT:\n{prompt}\n\n") - f.write(f"EXTRACTION CONTEXT:\n{part.data if part.data else 'No context'}\n\n") - f.write(f"EXTRACTION RESPONSE:\n{ai_result if ai_result else 'No response'}\n") - except Exception: - pass + # Save extraction prompt and response to debug + try: + from modules.shared.debugLogger import writeDebugFile + debugData = { + "chunk_index": chunk_index, + "mime_type": part.mimeType, + "type_group": part.typeGroup, + "context_length": len(part.data) if part.data else 0 + } + writeDebugFile(augmented_prompt, f"extraction_chunk_{chunk_index}", debugData) + writeDebugFile(ai_result or '', f"extraction_chunk_{chunk_index}_response") + except Exception: + pass # If generating JSON, validate the response if generate_json: @@ -641,19 +626,19 @@ class SubDocumentProcessing: # Log extraction response length self.services.utils.debugLogToFile(f"EXTRACTION RESPONSE LENGTH: {len(ai_result) if ai_result else 0} characters", "AI_SERVICE") - # Save extraction response to debug file (without verbose prompt) - only if debug enabled - debug_enabled = self.services.utils.configGet("APP_DEBUG_CHAT_WORKFLOW_ENABLED", False) - if debug_enabled: - try: - import os - from datetime import datetime, UTC - ts = datetime.now(UTC).strftime("%Y%m%d-%H%M%S") - debug_root = "./test-chat/ai" - os.makedirs(debug_root, exist_ok=True) - with open(os.path.join(debug_root, f"{ts}_extraction_chunk_{chunk_index}.txt"), "w", encoding="utf-8") as f: - f.write(f"EXTRACTION RESPONSE:\n{ai_result if ai_result else 'No response'}\n") - except Exception: - pass + # Save extraction prompt and response to debug + try: + from modules.shared.debugLogger import writeDebugFile + debugData = { + "chunk_index": chunk_index, + "mime_type": part.mimeType, + "type_group": part.typeGroup, + "context_length": len(part.data) if part.data else 0 + } + writeDebugFile(augmented_prompt_text, f"extraction_chunk_{chunk_index}", debugData) + writeDebugFile(ai_result or '', f"extraction_chunk_{chunk_index}_response") + except Exception: + pass # If generating JSON, validate the response if generate_json: diff --git a/modules/services/serviceExtraction/subPipeline.py b/modules/services/serviceExtraction/subPipeline.py index 9b18ea88..bb654b5e 100644 --- a/modules/services/serviceExtraction/subPipeline.py +++ b/modules/services/serviceExtraction/subPipeline.py @@ -101,42 +101,7 @@ def runExtraction(extractorRegistry: ExtractorRegistry, chunkerRegistry: Chunker logger.debug(f"runExtraction: Final parts after merging: {len(parts)} (chunks: {len(chunk_parts)})") logger.debug(f"runExtraction - Final parts: {len(parts)} (chunks: {len(chunk_parts)})") - # DEBUG: dump parts and chunks to files - only if debug enabled - try: - debug_enabled = APP_CONFIG.get("APP_DEBUG_CHAT_WORKFLOW_ENABLED", False) - if debug_enabled: - base_dir = "./test-chat/ai" - os.makedirs(base_dir, exist_ok=True) - - # Generate timestamp for consistent naming - from datetime import datetime, UTC - ts = datetime.now(UTC).strftime('%Y%m%d-%H%M%S-%f')[:-3] - - # Write a summary file - summary_lines: List[str] = [f"fileName: {fileName}", f"mimeType: {mimeType}", f"totalParts: {len(parts)}"] - text_index = 0 - for idx, part in enumerate(parts): - is_texty = part.typeGroup in ("text", "table", "structure") - size = int(part.metadata.get("size", 0) or 0) - is_chunk = bool(part.metadata.get("chunk", False)) - summary_lines.append( - f"part[{idx}]: typeGroup={part.typeGroup}, label={part.label}, size={size}, chunk={is_chunk}" - ) - if is_texty and getattr(part, "data", None): - text_index += 1 - fname = f"{ts}_extract_{fileName}_part_{idx:03d}_{'chunk' if is_chunk else 'full'}_{text_index:03d}.txt" - fpath = os.path.join(base_dir, fname) - with open(fpath, "w", encoding="utf-8") as f: - f.write(f"# typeGroup: {part.typeGroup}\n# label: {part.label}\n# chunk: {is_chunk}\n# size: {size}\n\n") - f.write(str(part.data)) - - # Write summary file - summary_fname = f"{ts}_extract_{fileName}_summary.txt" - summary_fpath = os.path.join(base_dir, summary_fname) - with open(summary_fpath, "w", encoding="utf-8") as f: - f.write("\n".join(summary_lines)) - except Exception as _e: - logger.debug(f"Debug dump skipped: {_e}") + # Timestamp-only extraction debug dumps removed return ContentExtracted(id=makeId(), parts=parts) diff --git a/modules/services/serviceGeneration/mainServiceGeneration.py b/modules/services/serviceGeneration/mainServiceGeneration.py index d38cea96..53e8a848 100644 --- a/modules/services/serviceGeneration/mainServiceGeneration.py +++ b/modules/services/serviceGeneration/mainServiceGeneration.py @@ -319,27 +319,7 @@ class GenerationService: if "sections" not in extractedContent: raise ValueError("extractedContent must contain 'sections' field") - # DEBUG: Log renderer input metadata only (no verbose JSON) - only if debug enabled - try: - debug_enabled = self.services.utils.configGet("APP_DEBUG_CHAT_WORKFLOW_ENABLED", False) - if debug_enabled: - import os, json - ts = datetime.now(UTC).strftime("%Y%m%d-%H%M%S") - debug_root = "./test-chat/ai" - debug_dir = os.path.join(debug_root, f"render_input_{ts}") - os.makedirs(debug_dir, exist_ok=True) - with open(os.path.join(debug_dir, "meta.txt"), "w", encoding="utf-8") as f: - f.write(f"title: {title}\nformat: {outputFormat}\ncontent_type: {type(extractedContent).__name__}\n") - f.write(f"content_size: {len(str(extractedContent))} characters\n") - f.write(f"sections_count: {len(extractedContent.get('sections', []))}\n") - # Also write the extracted content JSON for inspection - try: - with open(os.path.join(debug_dir, "extracted_content.json"), "w", encoding="utf-8") as jf: - json.dump(extractedContent, jf, ensure_ascii=False, indent=2) - except Exception: - pass - except Exception: - pass + # Remove extra debug file writes for render inputs per simplification # Get the appropriate renderer for the format renderer = self._getFormatRenderer(outputFormat) @@ -348,13 +328,7 @@ class GenerationService: # Render the JSON content directly (AI generation handled by main service) renderedContent, mimeType = await renderer.render(extractedContent, title, userPrompt, aiService) - # DEBUG: dump rendered output - try: - import os - with open(os.path.join(debug_dir, "rendered_output.txt"), "w", encoding="utf-8") as f: - f.write(renderedContent or "") - except Exception: - pass + # Remove extra debug output file writes logger.info(f"Successfully rendered JSON report to {outputFormat} format: {len(renderedContent)} characters") return renderedContent, mimeType diff --git a/modules/services/serviceGeneration/renderers/rendererBaseTemplate.py b/modules/services/serviceGeneration/renderers/rendererBaseTemplate.py index 150a903b..24728df4 100644 --- a/modules/services/serviceGeneration/renderers/rendererBaseTemplate.py +++ b/modules/services/serviceGeneration/renderers/rendererBaseTemplate.py @@ -91,16 +91,25 @@ class BaseRenderer(ABC): return True def _get_section_type(self, section: Dict[str, Any]) -> str: - """Get the type of a section.""" - return section.get("content_type", "paragraph") + """Get the type of a section; default to 'paragraph' for non-dict inputs.""" + if isinstance(section, dict): + return section.get("content_type", "paragraph") + # If section is a list or any other type, treat as paragraph elements + return "paragraph" def _get_section_data(self, section: Dict[str, Any]) -> List[Dict[str, Any]]: - """Get the elements of a section.""" - return section.get("elements", []) + """Get the elements of a section; if a list is provided directly, return it.""" + if isinstance(section, dict): + return section.get("elements", []) + if isinstance(section, list): + return section + return [] def _get_section_id(self, section: Dict[str, Any]) -> str: """Get the ID of a section (if available).""" - return section.get("id", "unknown") + if isinstance(section, dict): + return section.get("id", "unknown") + return "unknown" def _extract_table_data(self, section_data: Dict[str, Any]) -> Tuple[List[str], List[List[str]]]: """Extract table headers and rows from section data.""" @@ -332,6 +341,18 @@ class BaseRenderer(ABC): response = await ai_service.aiObjects.call(request) + # Save styling prompt and response to debug + try: + from modules.shared.debugLogger import writeDebugFile + debugData = { + "template_length": len(style_template), + "default_styles_keys": list(default_styles.keys()) if isinstance(default_styles, dict) else [] + } + writeDebugFile(style_template, "renderer_styling", debugData) + writeDebugFile(response.content or '', "renderer_styling_response") + except Exception: + pass + import json import re diff --git a/modules/services/serviceGeneration/renderers/rendererImage.py b/modules/services/serviceGeneration/renderers/rendererImage.py index 863a52e2..f54e8f82 100644 --- a/modules/services/serviceGeneration/renderers/rendererImage.py +++ b/modules/services/serviceGeneration/renderers/rendererImage.py @@ -59,6 +59,18 @@ class RendererImage(BaseRenderer): # Create AI prompt for image generation image_prompt = await self._create_image_generation_prompt(extracted_content, document_title, user_prompt, ai_service) + # Save image generation prompt to debug + try: + from modules.shared.debugLogger import writeDebugFile + debugData = { + "title": document_title, + "user_prompt_length": len(user_prompt) if user_prompt else 0, + "extracted_content_keys": list(extracted_content.keys()) if isinstance(extracted_content, dict) else [] + } + writeDebugFile(image_prompt, "renderer_image_generation", debugData) + except Exception: + pass + # Generate image using AI image_result = await ai_service.aiObjects.generateImage( prompt=image_prompt, @@ -67,6 +79,18 @@ class RendererImage(BaseRenderer): style="vivid" ) + # Save image generation response to debug + try: + from modules.shared.debugLogger import writeDebugFile + responseData = { + "success": image_result.get("success", False) if image_result else False, + "has_image_data": bool(image_result.get("image_data", "")) if image_result else False, + "result_keys": list(image_result.keys()) if isinstance(image_result, dict) else [] + } + writeDebugFile(str(image_result), "renderer_image_generation_response", responseData) + except Exception: + pass + # Extract base64 image data from result if image_result and image_result.get("success", False): image_data = image_result.get("image_data", "") diff --git a/modules/services/serviceWorkflow/mainServiceWorkflow.py b/modules/services/serviceWorkflow/mainServiceWorkflow.py index fe4b4d0f..e2f03dbb 100644 --- a/modules/services/serviceWorkflow/mainServiceWorkflow.py +++ b/modules/services/serviceWorkflow/mainServiceWorkflow.py @@ -2,7 +2,7 @@ import logging import uuid from typing import Dict, Any, List, Optional from modules.datamodels.datamodelUam import User, UserConnection -from modules.datamodels.datamodelChat import ChatDocument, ChatMessage +from modules.datamodels.datamodelChat import ChatDocument, ChatMessage, ChatStat from modules.datamodels.datamodelChat import ChatContentExtracted from modules.services.serviceExtraction.mainServiceExtraction import ExtractionService from modules.services.serviceGeneration.subDocumentUtility import getFileExtension, getMimeTypeFromExtension, detectContentTypeFromData @@ -79,7 +79,29 @@ class WorkflowService: """Get ChatDocuments from a list of document references using all three formats.""" try: workflow = self.services.currentWorkflow + logger.debug(f"getChatDocumentsFromDocumentList: input documentList = {documentList}") logger.debug(f"getChatDocumentsFromDocumentList: currentWorkflow.id = {workflow.id if workflow and hasattr(workflow, 'id') else 'NO_ID'}") + + # Debug: list available messages with their labels and document names + try: + if workflow and hasattr(workflow, 'messages') and workflow.messages: + msg_lines = [] + for message in workflow.messages: + label = getattr(message, 'documentsLabel', None) + doc_names = [] + if getattr(message, 'documents', None): + for doc in message.documents: + name = getattr(doc, 'fileName', None) or getattr(doc, 'documentName', None) or 'Unnamed' + doc_names.append(name) + msg_lines.append( + f"- id={getattr(message, 'id', None)}, label={label}, docs={doc_names}" + ) + if msg_lines: + logger.debug("getChatDocumentsFromDocumentList: available messages:\n" + "\n".join(msg_lines)) + else: + logger.debug("getChatDocumentsFromDocumentList: no messages available on current workflow") + except Exception as e: + logger.debug(f"getChatDocumentsFromDocumentList: unable to enumerate messages for debug: {e}") all_documents = [] for doc_ref in documentList: @@ -482,24 +504,86 @@ class WorkflowService: logger.error(f"Error getting workflow: {str(e)}") raise - def createMessage(self, messageData: Dict[str, Any]): - """Create a new message by delegating to the chat interface and append to in-memory workflow.""" + # === Service-level transactions (DB write-through + in-memory sync) === + + def storeMessageWithDocuments(self, workflow: Any, messageData: Dict[str, Any], documents: List[Any]): + """Persist message and documents, then bind them into in-memory workflow (replace-by-id).""" + # Ensure workflowId on message + messageData = dict(messageData or {}) + messageData["workflowId"] = workflow.id + # Attach documents to message creation via interface (it persists message then docs) + messageDataWithDocs = dict(messageData) + messageDataWithDocs["documents"] = documents or [] + chatInterface = self.interfaceDbChat + chatMessage = chatInterface.createMessage(messageDataWithDocs) + if not chatMessage: + raise ValueError("Failed to create message with documents") + # In-memory sync: replace or append + # replace-by-id if exists + replaced = False + for i, m in enumerate(workflow.messages or []): + if getattr(m, 'id', None) == getattr(chatMessage, 'id', None): + workflow.messages[i] = chatMessage + replaced = True + break + if not replaced: + workflow.messages.append(chatMessage) + return chatMessage + + def storeLog(self, workflow: Any, logData: Dict[str, Any]) -> Any: + """Persist ChatLog and map it into the in-memory workflow logs list.""" + logData = dict(logData or {}) + logData["workflowId"] = workflow.id + chatInterface = self.interfaceDbChat + chatLog = chatInterface.createLog(logData) + if not chatLog: + raise ValueError("Failed to create log") + # replace-by-id if exists + replaced = False + for i, lg in enumerate(workflow.logs): + if getattr(lg, 'id', None) == getattr(chatLog, 'id', None): + workflow.logs[i] = chatLog + replaced = True + break + if not replaced: + workflow.logs.append(chatLog) + return chatLog + + def storeWorkflowStat(self, workflow: Any, statData: Dict[str, Any]) -> Any: + """Persist workflow-level ChatStat and set/replace on in-memory workflow.""" + statData = dict(statData or {}) + statData["workflowId"] = workflow.id + chatInterface = self.interfaceDbChat + # Reuse updateWorkflowStats for incremental or create raw record when needed try: - message = self.interfaceDbChat.createMessage(messageData) - try: - # Keep in-memory workflow messages in sync - workflow = getattr(self.services, 'currentWorkflow', None) - if workflow and hasattr(workflow, 'messages') and message: - # Avoid duplicates if same message was already appended - if not any(getattr(m, 'id', None) == getattr(message, 'id', None) for m in workflow.messages): - workflow.messages.append(message) - except Exception: - # Never fail if local append has issues - pass - return message + self.updateWorkflowStats(workflow.id, **{ + 'bytesSent': statData.get('bytesSent', 0), + 'bytesReceived': statData.get('bytesReceived', 0), + 'tokenCount': statData.get('tokenCount', 0) + }) + except Exception: + pass + stat = chatInterface.getWorkflowStats(workflow.id) + workflow.stats = stat + return stat + + def storeMessageStat(self, workflow: Any, messageId: str, statData: Dict[str, Any]) -> Any: + """Persist message-level ChatStat and bind to the message in-memory.""" + statData = dict(statData or {}) + statData["workflowId"] = workflow.id + statData["messageId"] = messageId + # Persist as ChatStat row + try: + self.interfaceDbChat.db.recordCreate(ChatStat, statData) except Exception as e: - logger.error(f"Error creating message: {str(e)}") + logger.error(f"Failed to persist message stat: {e}") raise + stat = self.interfaceDbChat.getMessageStats(messageId) + for m in workflow.messages or []: + if getattr(m, 'id', None) == messageId: + m.stats = stat + break + return stat def updateMessage(self, messageId: str, messageData: Dict[str, Any]): """Update message by delegating to the chat interface""" @@ -509,29 +593,6 @@ class WorkflowService: logger.error(f"Error updating message: {str(e)}") raise - def createLog(self, logData: Dict[str, Any]): - """Create a new log entry by delegating to the chat interface and append to in-memory workflow logs.""" - try: - log_entry = self.interfaceDbChat.createLog(logData) - try: - workflow = getattr(self.services, 'currentWorkflow', None) - if workflow and hasattr(workflow, 'logs') and log_entry: - # Avoid duplicates by id if present, else compare message+timestamp tuple - get_id = getattr(log_entry, 'id', None) - if get_id is not None: - if not any(getattr(l, 'id', None) == get_id for l in workflow.logs): - workflow.logs.append(log_entry) - else: - key = (getattr(log_entry, 'message', None), getattr(log_entry, 'publishedAt', None)) - if not any((getattr(l, 'message', None), getattr(l, 'publishedAt', None)) == key for l in workflow.logs): - workflow.logs.append(log_entry) - except Exception: - pass - return log_entry - except Exception as e: - logger.error(f"Error creating log: {str(e)}") - raise - def getDocumentCount(self) -> str: """Get document count for task planning (matching old handlingTasks.py logic)""" try: @@ -609,30 +670,7 @@ class WorkflowService: # Get document reference list using the exact same logic as old system document_list = self._getDocumentReferenceList(workflow) - # Optional: dump a concise document index for debugging - try: - debug_enabled = self.services.utils.configGet("APP_DEBUG_CHAT_WORKFLOW_ENABLED", False) - if debug_enabled: - import os, json - from datetime import datetime, UTC - ts = datetime.now(UTC).strftime("%Y%m%d-%H%M%S") - debug_root = "./test-chat/ai" - os.makedirs(debug_root, exist_ok=True) - doc_index = [] - for bucket in ("chat", "history"): - for ex in document_list.get(bucket, []) or []: - doc_index.append({ - "bucket": bucket, - "label": ex.get("documentsLabel"), - "documents": ex.get("documents", []) - }) - with open(os.path.join(debug_root, f"{ts}_available_documents_index.json"), "w", encoding="utf-8") as f: - json.dump({ - "workflowId": getattr(workflow, 'id', None), - "index": doc_index - }, f, ensure_ascii=False, indent=2) - except Exception: - pass + # Timestamp-only available documents index dump removed # Build index string for AI action planning context = "" @@ -723,43 +761,50 @@ class WorkflowService: except Exception: return False + # Simplified, deterministic logic: + # - Walk messages newest-first + # - For each document, assign it exactly once to a bucket based on the message round + # - Never allow the same doc to appear in both buckets chat_exchanges = [] history_exchanges = [] - - in_current_round = True + seen_doc_ids = set() + current_round = getattr(workflow, 'currentRound', None) + for message in reversed(workflow.messages): - is_first = message.status == "first" if hasattr(message, 'status') else False - - doc_exchange = None - if message.documents: - existing_label = getattr(message, 'documentsLabel', None) - if existing_label: - validated_label = self._validateDocumentLabelConsistency(message) - doc_refs = [] - for doc in message.documents: - if not _is_valid_document(doc): - # Skip empty/invalid docs - continue - doc_ref = self._getDocumentReferenceFromChatDocument(doc, message) - doc_refs.append(doc_ref) - if doc_refs: - doc_exchange = { - 'documentsLabel': validated_label, - 'documents': doc_refs - } - - if doc_exchange: - if in_current_round: - chat_exchanges.append(doc_exchange) - else: - history_exchanges.append(doc_exchange) - - if in_current_round and is_first: - in_current_round = False - - chat_exchanges.sort(key=lambda x: self._getMessageSequenceForExchange(x, workflow), reverse=True) - history_exchanges.sort(key=lambda x: self._getMessageSequenceForExchange(x, workflow), reverse=True) - + if not getattr(message, 'documents', None): + continue + + label = getattr(message, 'documentsLabel', None) + if not label: + # Skip messages without a label to keep references consistent + continue + + doc_refs = [] + for doc in message.documents: + if not _is_valid_document(doc): + continue + # Avoid duplicates across chat/history + doc_id = getattr(doc, 'id', None) + if not doc_id or doc_id in seen_doc_ids: + continue + seen_doc_ids.add(doc_id) + doc_ref = self.getDocumentReferenceFromChatDocument(doc) + doc_refs.append(doc_ref) + + if not doc_refs: + continue + + entry = { + 'documentsLabel': label, + 'documents': doc_refs + } + + msg_round = getattr(message, 'roundNumber', None) + if current_round is not None and msg_round == current_round: + chat_exchanges.append(entry) + else: + history_exchanges.append(entry) + return { "chat": chat_exchanges, "history": history_exchanges @@ -803,7 +848,7 @@ class WorkflowService: action_num = message.actionNumber if hasattr(message, 'actionNumber') else 0 return f"round{round_num}_task{task_num}_action{action_num}" - def _getDocumentReferenceFromChatDocument(self, document, message) -> str: + def getDocumentReferenceFromChatDocument(self, document) -> str: """Get document reference using document ID and filename.""" try: # Use document ID and filename for simple reference diff --git a/modules/shared/debugLogger.py b/modules/shared/debugLogger.py new file mode 100644 index 00000000..fb8bde68 --- /dev/null +++ b/modules/shared/debugLogger.py @@ -0,0 +1,123 @@ +""" +Simple debug logger for AI prompts and responses. +Writes files chronologically to gateway/test-chat/ai/ with sequential numbering. +""" +import os +import json +from datetime import datetime, UTC +from typing import Any, Optional + + +def _getDebugDir() -> str: + """Get the debug directory path.""" + gatewayDir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + return os.path.join(gatewayDir, 'test-chat', 'ai') + + +def _getNextSequenceNumber() -> int: + """Get the next sequence number by counting existing files.""" + debugDir = _getDebugDir() + if not os.path.exists(debugDir): + return 1 + + # Count existing numbered files + files = [f for f in os.listdir(debugDir) if f.startswith(('0', '1', '2', '3', '4', '5', '6', '7', '8', '9'))] + return len(files) + 1 + + +def _formatJsonReadable(data: Any) -> str: + """ + Format JSON data in a readable line-by-line structure. + Handles both structured objects and text representations of dicts/lists. + + Args: + data: The data to format + + Returns: + Formatted string representation + """ + try: + # First try to parse if it's a string representation + if isinstance(data, str): + try: + # Try to parse as JSON first + parsed = json.loads(data) + data = parsed + except json.JSONDecodeError: + # Try to evaluate as Python literal (for dict/list strings) + try: + import ast + parsed = ast.literal_eval(data) + if isinstance(parsed, (dict, list)): + data = parsed + except (ValueError, SyntaxError): + # If all parsing fails, treat as plain text + pass + + # Convert to JSON string with proper indentation + if isinstance(data, (dict, list)): + jsonStr = json.dumps(data, ensure_ascii=False, default=str, indent=2) + else: + jsonStr = str(data) + + # Split into lines and add line numbers for better readability + lines = jsonStr.split('\n') + formattedLines = [] + + for i, line in enumerate(lines, 1): + # Add line number and proper spacing + lineNum = f"{i:3d}: " + formattedLines.append(f"{lineNum}{line}") + + return '\n'.join(formattedLines) + except Exception: + # Fallback to string representation if JSON formatting fails + return str(data) + + +def writeDebugFile(content: str, fileType: str, data: Optional[Any] = None) -> None: + """ + Write debug content to a file with sequential numbering. + + Args: + content: The main content to write + fileType: Type of file (e.g., 'prompt', 'response', 'placeholders') + data: Optional additional data to include as JSON + """ + try: + debugDir = _getDebugDir() + os.makedirs(debugDir, exist_ok=True) + + seqNum = _getNextSequenceNumber() + ts = datetime.now(UTC).strftime('%Y%m%d-%H%M%S') + # Add 3-digit sequence number for uniqueness + tsWithSeq = f"{ts}-{seqNum:03d}" + + filename = f"{tsWithSeq}-{fileType}.txt" + filepath = os.path.join(debugDir, filename) + + with open(filepath, 'w', encoding='utf-8') as f: + f.write(content) + + # If structured data provided, also append a human-readable section to the main .txt + try: + if data is not None: + formatted = _formatJsonReadable(data) + with open(filepath, 'a', encoding='utf-8') as f: + f.write("\n\n=== FORMATTED DATA (human-readable) ===\n") + f.write(formatted) + f.write("\n") + except Exception: + pass + + # If additional data provided, write it as a separate JSON file with readable formatting + if data is not None: + jsonFilename = f"{tsWithSeq}-{fileType}_data.json" + jsonFilepath = os.path.join(debugDir, jsonFilename) + with open(jsonFilepath, 'w', encoding='utf-8') as f: + formattedData = _formatJsonReadable(data) + f.write(formattedData) + + except Exception as e: + # Silent fail - don't break the main flow + pass diff --git a/modules/workflows/methods/methodOutlook.py b/modules/workflows/methods/methodOutlook.py index f613cbe1..9909bd9f 100644 --- a/modules/workflows/methods/methodOutlook.py +++ b/modules/workflows/methods/methodOutlook.py @@ -1143,51 +1143,46 @@ class MethodOutlook(MethodBase): chatDocuments = self.services.workflow.getChatDocumentsFromDocumentList(documentList) # Create AI prompt for email composition - # Build document reference list for AI + # Build document reference list for AI with expanded list contents when possible doc_references = documentList doc_list_text = "" if doc_references: - doc_list_text = f"Available_Document_References: {', '.join(doc_references)}" + lines = ["Available_Document_References:"] + workflow_obj = getattr(self.services, 'currentWorkflow', None) + for ref in doc_references: + # Each item is a label: resolve to its document list and render contained items + list_docs = self.services.workflow.getChatDocumentsFromDocumentList([ref]) or [] + if list_docs: + for d in list_docs: + doc_ref_label = self.services.workflow.getDocumentReferenceFromChatDocument(d) + lines.append(f"- {doc_ref_label}") + else: + lines.append(" - (no documents)") + doc_list_text = "\n" + "\n".join(lines) else: doc_list_text = "Available_Document_References: (No documents available for attachment)" # Escape only the user-controlled context to prevent prompt injection escaped_context = context.replace('"', '\\"').replace('\n', '\\n').replace('\r', '\\r') - ai_prompt = f""" -Compose a professional email based on the following context and requirements: - -CONTEXT: ----------------- + ai_prompt = f"""Compose an email based on this context: +------- {escaped_context} ----------------- - -RECIPIENT: {to} -EMAIL STYLE: {emailStyle} -MAX LENGTH: {maxLength} characters +------- +Recipients: {to} +Style: {emailStyle} +Max length: {maxLength} characters {doc_list_text} -Please generate: -1. A clear, professional subject line -2. A well-structured email body that addresses the context appropriately -3. Use the {emailStyle} tone throughout -4. Decide which documents from Available_Document_References (if any) should be attached to the email +Based on the context, decide which documents to attach. -Return your response in the following JSON format: +Return JSON: {{ - "subject": "Your generated subject line here", - "body": "Your generated email body here (can include HTML formatting like
for line breaks)", - "attachments": ["document_reference", "document_reference", ...] -}} - -Make sure the email is: -- Professional and appropriate for the context -- Clear and concise -- Well-structured with proper greeting and closing -- Relevant to the provided context -- Include only relevant documents as attachments (use EXACT document references from the Available_Document_References) -""" + "subject": "subject line", + "body": "email body (HTML allowed)", + "attachments": ["doc_ref1", "doc_ref2"] +}}""" # Call AI service to generate email content try: @@ -1231,16 +1226,44 @@ Make sure the email is: return ActionResult.isFailure(error="AI did not generate valid subject and body") # Use AI-selected attachments if provided, otherwise use all documents - if ai_attachments: - # Filter documentList to only include AI-selected attachments - selected_docs = [doc_ref for doc_ref in documentList if doc_ref in ai_attachments] - if selected_docs: - documentList = selected_docs - logger.info(f"AI selected {len(selected_docs)} documents for attachment: {selected_docs}") + if documentList: + try: + available_refs = [documentList] if isinstance(documentList, str) else documentList + available_docs = self.services.workflow.getChatDocumentsFromDocumentList(available_refs) or [] + except Exception: + available_docs = [] + + # Normalize AI attachments to a list of strings + if isinstance(ai_attachments, str): + ai_attachments = [ai_attachments] + elif isinstance(ai_attachments, list): + ai_attachments = [a for a in ai_attachments if isinstance(a, str)] + + if ai_attachments: + try: + ai_refs = [ai_attachments] if isinstance(ai_attachments, str) else ai_attachments + ai_docs = self.services.workflow.getChatDocumentsFromDocumentList(ai_refs) or [] + except Exception: + ai_docs = [] + + # Intersect by document id + available_ids = {getattr(d, 'id', None) for d in available_docs} + selected_docs = [d for d in ai_docs if getattr(d, 'id', None) in available_ids] + + if selected_docs: + # Map selected ChatDocuments back to docItem references + documentList = [self.services.workflow.getDocumentReferenceFromChatDocument(d) for d in selected_docs] + logger.info(f"AI selected {len(documentList)} documents for attachment (resolved via ChatDocuments)") + else: + # No intersection; use all available documents + documentList = [self.services.workflow.getDocumentReferenceFromChatDocument(d) for d in available_docs] + logger.warning("AI selected attachments not found in available documents, using all documents") else: - logger.warning("AI selected attachments not found in available documents, using all documents") + # No AI selection; use all available documents + documentList = [self.services.workflow.getDocumentReferenceFromChatDocument(d) for d in available_docs] + logger.warning("AI did not specify attachments, using all available documents") else: - logger.info("AI did not specify attachments, using all available documents") + logger.info("No documents provided in documentList; skipping attachment processing") except json.JSONDecodeError as e: logger.error(f"Failed to parse AI response as JSON: {str(e)}") diff --git a/modules/workflows/processing/adaptive/adaptiveLearningEngine.py b/modules/workflows/processing/adaptive/adaptiveLearningEngine.py new file mode 100644 index 00000000..cc21d42a --- /dev/null +++ b/modules/workflows/processing/adaptive/adaptiveLearningEngine.py @@ -0,0 +1,271 @@ +# adaptiveLearningEngine.py +# Enhanced learning engine that tracks validation patterns and adapts prompts + +import json +import logging +from typing import Dict, Any, List, Optional +from datetime import datetime, timezone +from collections import defaultdict + +logger = logging.getLogger(__name__) + +class AdaptiveLearningEngine: + """Enhanced learning engine that tracks validation patterns and adapts prompts""" + + def __init__(self): + self.validationHistory = [] # Store validation results with context + self.failurePatterns = defaultdict(list) # Track failure patterns by action type + self.successPatterns = defaultdict(list) # Track success patterns + self.actionAttempts = defaultdict(int) # Track attempt counts per action + self.learningInsights = {} # Store learned insights per workflow + + def recordValidationResult(self, validationResult: Dict[str, Any], actionContext: Dict[str, Any], + workflowId: str, attemptNumber: int): + """Record validation result and learn from it""" + try: + actionType = actionContext.get('actionType', 'unknown') + actionName = actionContext.get('actionName', 'unknown') + + # Store validation history + validationEntry = { + 'workflowId': workflowId, + 'actionType': actionType, + 'actionName': actionName, + 'attemptNumber': attemptNumber, + 'validationResult': validationResult, + 'actionContext': actionContext, + 'timestamp': datetime.now(timezone.utc).isoformat(), + 'success': validationResult.get('overallSuccess', False), + 'qualityScore': validationResult.get('qualityScore', 0.0) + } + + self.validationHistory.append(validationEntry) + + # Track patterns + if validationResult.get('overallSuccess', False): + self.successPatterns[actionType].append(validationEntry) + else: + self.failurePatterns[actionType].append(validationEntry) + + # Update attempt count + self.actionAttempts[f"{workflowId}:{actionType}"] += 1 + + # Generate learning insights + self._generateLearningInsights(workflowId, actionType) + + logger.info(f"Recorded validation for {actionType} (attempt {attemptNumber}): " + f"Success={validationResult.get('overallSuccess', False)}, " + f"Quality={validationResult.get('qualityScore', 0.0)}") + + except Exception as e: + logger.error(f"Error recording validation result: {str(e)}") + + def getAdaptiveContextForActionSelection(self, workflowId: str, userPrompt: str) -> Dict[str, Any]: + """Generate adaptive context for action selection prompt""" + try: + # Get recent validation history for this workflow + recentValidations = [ + v for v in self.validationHistory + if v['workflowId'] == workflowId + ][-5:] # Last 5 validations + + # Analyze failure patterns + failureAnalysis = self._analyzeFailurePatterns(recentValidations) + + # Generate specific guidance for next action + adaptiveGuidance = self._generateActionGuidance(userPrompt, recentValidations, failureAnalysis) + + return { + 'recentValidations': recentValidations, + 'failureAnalysis': failureAnalysis, + 'adaptiveGuidance': adaptiveGuidance, + 'learningInsights': self.learningInsights.get(workflowId, {}), + 'escalationLevel': self._getEscalationLevel(workflowId) + } + + except Exception as e: + logger.error(f"Error generating adaptive context: {str(e)}") + return {} + + def getAdaptiveContextForParameters(self, workflowId: str, actionType: str, + parametersContext: str) -> Dict[str, Any]: + """Generate adaptive context for parameter selection prompt""" + try: + # Get validation history for this specific action type + actionValidations = [ + v for v in self.validationHistory + if v['workflowId'] == workflowId and v['actionType'] == actionType + ][-3:] # Last 3 attempts for this action + + # Analyze what went wrong in previous attempts + failureAnalysis = self._analyzeParameterFailures(actionValidations) + + # Generate specific parameter guidance + parameterGuidance = self._generateParameterGuidance(actionType, parametersContext, failureAnalysis) + + return { + 'actionValidations': actionValidations, + 'failureAnalysis': failureAnalysis, + 'parameterGuidance': parameterGuidance, + 'attemptNumber': len(actionValidations) + 1, + 'escalationLevel': self._getEscalationLevel(workflowId) + } + + except Exception as e: + logger.error(f"Error generating parameter context: {str(e)}") + return {} + + def _analyzeFailurePatterns(self, validations: List[Dict[str, Any]]) -> Dict[str, Any]: + """Analyze failure patterns from validation history""" + if not validations: + return {} + + failedValidations = [v for v in validations if not v['success']] + + if not failedValidations: + return {'hasFailures': False} + + # Extract common failure themes + commonIssues = [] + for validation in failedValidations: + issues = validation['validationResult'].get('validationDetails', [{}]) + for issue in issues: + commonIssues.extend(issue.get('issues', [])) + + # Count most common issues + issueCounts = defaultdict(int) + for issue in commonIssues: + issueCounts[issue] += 1 + + return { + 'hasFailures': True, + 'failureCount': len(failedValidations), + 'commonIssues': dict(sorted(issueCounts.items(), key=lambda x: x[1], reverse=True)), + 'lastFailure': failedValidations[-1] if failedValidations else None + } + + def _analyzeParameterFailures(self, validations: List[Dict[str, Any]]) -> Dict[str, Any]: + """Analyze parameter-specific failure patterns""" + if not validations: + return {'hasFailures': False} + + failedValidations = [v for v in validations if not v['success']] + + if not failedValidations: + return {'hasFailures': False} + + # Extract common failure themes + commonIssues = [] + for validation in failedValidations: + issues = validation['validationResult'].get('validationDetails', [{}]) + for issue in issues: + commonIssues.extend(issue.get('issues', [])) + + # Count most common issues + issueCounts = defaultdict(int) + for issue in commonIssues: + issueCounts[issue] += 1 + + return { + 'hasFailures': True, + 'failureCount': len(failedValidations), + 'commonIssues': dict(sorted(issueCounts.items(), key=lambda x: x[1], reverse=True)), + 'lastFailure': failedValidations[-1] if failedValidations else None, + 'attemptNumber': len(validations) + } + + def _generateActionGuidance(self, userPrompt: str, validations: List[Dict[str, Any]], + failureAnalysis: Dict[str, Any]) -> str: + """Generate specific guidance for next action based on learning""" + if not failureAnalysis.get('hasFailures', False): + return "No previous failures detected. Proceed with standard approach." + + guidance_parts = [] + + # Add failure awareness + failureCount = failureAnalysis.get('failureCount', 0) + if failureCount >= 3: + guidance_parts.append("CRITICAL: Multiple previous attempts have failed. Consider alternative approaches.") + elif failureCount >= 1: + guidance_parts.append("WARNING: Previous attempts have failed. Learn from validation feedback.") + + # Add specific issue guidance + commonIssues = failureAnalysis.get('commonIssues', {}) + if commonIssues: + guidance_parts.append("COMMON FAILURE PATTERNS:") + for issue, count in list(commonIssues.items())[:3]: # Top 3 issues + guidance_parts.append(f"- {issue} (occurred {count} times)") + + # Add specific action guidance based on user prompt + if "email" in userPrompt.lower() and "outlook" in userPrompt.lower(): + if any("account" in str(issue).lower() for issue in commonIssues.keys()): + guidance_parts.append("SPECIFIC GUIDANCE: Ensure email is sent from the correct account (valueon).") + if any("attachment" in str(issue).lower() for issue in commonIssues.keys()): + guidance_parts.append("SPECIFIC GUIDANCE: Verify PDF attachment is properly included.") + if any("summary" in str(issue).lower() for issue in commonIssues.keys()): + guidance_parts.append("SPECIFIC GUIDANCE: Include German summary in email body.") + + return "\n".join(guidance_parts) if guidance_parts else "No specific guidance available." + + def _generateParameterGuidance(self, actionType: str, parametersContext: str, + failureAnalysis: Dict[str, Any]) -> str: + """Generate specific parameter guidance based on previous failures""" + if not failureAnalysis.get('hasFailures', False): + return "No previous parameter failures. Use standard parameter values." + + guidance_parts = [] + + # Add attempt awareness + attemptNumber = failureAnalysis.get('attemptNumber', 1) + if attemptNumber >= 3: + guidance_parts.append(f"ATTEMPT #{attemptNumber}: Previous attempts failed. Adjust parameters based on validation feedback.") + + # Add specific parameter guidance based on action type + if actionType == "outlook.composeAndSendEmailWithContext": + guidance_parts.append("EMAIL PARAMETER GUIDANCE:") + guidance_parts.append("- context: Be very specific about account (valueon), appointment time (Friday), and requirements") + guidance_parts.append("- emailStyle: Use 'formal' for business emails") + guidance_parts.append("- maxLength: Set to 2000+ for detailed emails with summaries") + + # Add specific guidance based on common failures + commonIssues = failureAnalysis.get('commonIssues', {}) + if any("account" in str(issue).lower() for issue in commonIssues.keys()): + guidance_parts.append("- context: MUST specify 'from valueon account' explicitly") + if any("attachment" in str(issue).lower() for issue in commonIssues.keys()): + guidance_parts.append("- documentList: Ensure PDF is properly referenced") + if any("summary" in str(issue).lower() for issue in commonIssues.keys()): + guidance_parts.append("- context: MUST request '10-12 sentence German summary' explicitly") + + return "\n".join(guidance_parts) if guidance_parts else "Use standard parameter values." + + def _getEscalationLevel(self, workflowId: str) -> str: + """Determine escalation level based on failure patterns""" + workflowValidations = [v for v in self.validationHistory if v['workflowId'] == workflowId] + failedAttempts = len([v for v in workflowValidations if not v['success']]) + + if failedAttempts >= 5: + return "critical" + elif failedAttempts >= 3: + return "high" + elif failedAttempts >= 1: + return "medium" + else: + return "low" + + def _generateLearningInsights(self, workflowId: str, actionType: str): + """Generate learning insights for a workflow""" + if workflowId not in self.learningInsights: + self.learningInsights[workflowId] = {} + + # Analyze patterns for this workflow + workflowValidations = [v for v in self.validationHistory if v['workflowId'] == workflowId] + + insights = { + 'totalAttempts': len(workflowValidations), + 'successfulAttempts': len([v for v in workflowValidations if v['success']]), + 'failedAttempts': len([v for v in workflowValidations if not v['success']]), + 'lastActionType': actionType, + 'escalationLevel': self._getEscalationLevel(workflowId) + } + + self.learningInsights[workflowId] = insights diff --git a/modules/workflows/processing/adaptive/contentValidator.py b/modules/workflows/processing/adaptive/contentValidator.py index 5423dc8e..0aa634c3 100644 --- a/modules/workflows/processing/adaptive/contentValidator.py +++ b/modules/workflows/processing/adaptive/contentValidator.py @@ -11,43 +11,37 @@ logger = logging.getLogger(__name__) class ContentValidator: """Validates delivered content against user intent""" - def __init__(self, services=None): + def __init__(self, services=None, learningEngine=None): self.services = services + self.learningEngine = learningEngine async def validateContent(self, documents: List[Any], intent: Dict[str, Any]) -> Dict[str, Any]: - """Validates delivered content against user intent using AI""" - try: - # Use AI for comprehensive validation - return await self._validateWithAI(documents, intent) - - except Exception as e: - logger.error(f"Error validating content: {str(e)}") - return self._createFailedValidationResult(str(e)) + """Validates delivered content against user intent using AI (single attempt; parse-or-fail)""" + return await self._validateWithAI(documents, intent) def _extractContent(self, doc: Any) -> str: - """Extracts content from a document""" + """Extracts content from a document with size protection for large documents""" try: if hasattr(doc, 'documentData'): data = doc.documentData if isinstance(data, dict) and 'content' in data: - return str(data['content']) + content = data['content'] + # For large content, check size before converting to string + if hasattr(content, '__len__') and len(str(content)) > 100000: # 100KB threshold + # For very large content, return a size indicator instead + return f"[Large document content - {len(str(content))} characters - truncated for validation]" + return str(content) else: - return str(data) + content = data + # For large content, check size before converting to string + if hasattr(content, '__len__') and len(str(content)) > 100000: # 100KB threshold + return f"[Large document content - {len(str(content))} characters - truncated for validation]" + return str(content) return "" except Exception: return "" - def _createFailedValidationResult(self, error: str) -> Dict[str, Any]: - """Creates a failed validation result in a schema-stable shape""" - return { - "overallSuccess": None, # Unknown when validator itself failed - "qualityScore": None, - "validationDetails": [], - "improvementSuggestions": [f"NEXT STEP: Fix validation error - {error}. Check system logs for more details and retry the operation."], - "schemaCompliant": False, - "originalType": "error", - "missingFields": ["overallSuccess", "qualityScore"], - } + # Removed schema fallback creator to keep failures explicit def _isValidJsonResponse(self, response: str) -> bool: """Checks if response contains valid JSON structure""" @@ -62,46 +56,7 @@ class ContentValidator: except: return False - def _extractFallbackValidationResult(self, response: str) -> Dict[str, Any]: - """Extracts a minimal validation result from a malformed AI response (schema-stable)""" - try: - import re - - # Extract key values using regex patterns - overall_success = re.search(r'"overallSuccess"\s*:\s*(true|false)', response, re.IGNORECASE) - quality_score = re.search(r'"qualityScore"\s*:\s*([0-9.]+)', response) - gap_analysis = re.search(r'"gapAnalysis"\s*:\s*"([^"]*)"', response) - - # Determine overall success from context if not found - if not overall_success: - # Look for positive/negative indicators in the text - if any(word in response.lower() for word in ['success', 'complete', 'fulfilled', 'satisfied']): - overall_success = True - elif any(word in response.lower() for word in ['failed', 'incomplete', 'missing', 'error']): - overall_success = False - else: - overall_success = False - - parsed_overall = overall_success if isinstance(overall_success, bool) else (overall_success.group(1).lower() == 'true' if overall_success else None) - parsed_quality = float(quality_score.group(1)) if quality_score else None - - result = { - "overallSuccess": parsed_overall, - "qualityScore": parsed_quality, - "validationDetails": [{ - "documentName": "AI Validation (Fallback)", - "gapAnalysis": gap_analysis.group(1) if gap_analysis else "Unable to parse detailed analysis", - "successCriteriaMet": [] - }], - "improvementSuggestions": ["NEXT STEP: AI response was malformed - retry the operation for better results"], - "schemaCompliant": False, - "originalType": "text", - "missingFields": [k for k, v in {"overallSuccess": parsed_overall, "qualityScore": parsed_quality}.items() if v is None], - } - return result - except Exception as e: - logger.error(f"Fallback extraction failed: {str(e)}") - return None + # Removed text-based fallback extraction to avoid hiding issues async def _validateWithAI(self, documents: List[Any], intent: Dict[str, Any]) -> Dict[str, Any]: """AI-based comprehensive validation - single main function""" @@ -118,46 +73,46 @@ class ContentValidator: "content": content[:2000] # Limit content for AI processing }) - # Create comprehensive AI validation prompt - validationPrompt = f""" -You are a comprehensive task completion validator. Analyze if the delivered content fulfills the user's request. + # Create structured AI validation prompt + successCriteria = intent.get('successCriteria', []) + criteriaCount = len(successCriteria) + + validationPrompt = f"""TASK VALIDATION -USER REQUEST: {intent.get('primaryGoal', 'Unknown')} -EXPECTED DATA TYPE: {intent.get('dataType', 'unknown')} +USER REQUEST: '{intent.get('primaryGoal', 'Unknown')}' +EXPECTED TYPE: {intent.get('dataType', 'unknown')} EXPECTED FORMAT: {intent.get('expectedFormat', 'unknown')} -SUCCESS CRITERIA: {intent.get('successCriteria', [])} +SUCCESS CRITERIA ({criteriaCount} items): {successCriteria} -DELIVERED CONTENT: +VALIDATION RULES: +1. Check if content matches expected data type +2. Check if content matches expected format +3. Verify each success criterion is met +4. Rate overall quality (0.0-1.0) +5. Identify specific gaps +6. Suggest next steps + +OUTPUT FORMAT - JSON ONLY (no prose): +{{ + "overallSuccess": false, + "qualityScore": 0.0, + "dataTypeMatch": false, + "formatMatch": false, + "successCriteriaMet": {[False] * criteriaCount}, + "gapAnalysis": "Specific gaps found", + "improvementSuggestions": ["NEXT STEP: Action 1", "NEXT STEP: Action 2"], + "validationDetails": [ + {{ + "documentName": "Document Name", + "issues": ["Issue 1", "Issue 2"], + "suggestions": ["NEXT STEP: Fix 1", "NEXT STEP: Fix 2"] + }} + ] +}} + +DELIVERED CONTENT TO CHECK: {json.dumps(documentContents, indent=2)} -Perform comprehensive validation: -1. Check if content matches expected data type -2. Check if content matches expected format -3. Verify success criteria are met -4. Assess overall quality and completeness -5. Identify specific gaps and issues -6. Provide actionable next steps - -CRITICAL: You MUST respond with ONLY the JSON object below. NO TEXT ANALYSIS. NO EXPLANATIONS. NO OTHER CONTENT. - -RESPOND WITH THIS EXACT JSON FORMAT: - -{{ - "overallSuccess": false, - "qualityScore": 0.5, - "dataTypeMatch": false, - "formatMatch": false, - "successCriteriaMet": [false, false], - "gapAnalysis": "Content does not match expected format and lacks required elements", - "improvementSuggestions": ["NEXT STEP: Create proper content in expected format", "NEXT STEP: Ensure all success criteria are met"], - "validationDetails": [ - {{ - "documentName": "Content Validation", - "issues": ["Format mismatch", "Missing required elements"], - "suggestions": ["NEXT STEP: Fix format", "NEXT STEP: Add missing elements"] - }} - ] -}} """ # Call AI service for validation @@ -170,56 +125,24 @@ RESPOND WITH THIS EXACT JSON FORMAT: documents=None, options=request_options ) + # Write validation prompt/response to debug + from modules.shared.debugLogger import writeDebugFile + writeDebugFile(validationPrompt, "validation_content_prompt") + writeDebugFile(response or '', "validation_content_response") - # If first attempt fails, try with more explicit prompt - if response and not self._isValidJsonResponse(response): - logger.debug("First AI validation attempt failed, retrying with explicit JSON-only prompt") - explicitPrompt = f""" -VALIDATE AND RETURN JSON ONLY - NO TEXT ANALYSIS + # No retries or correction prompts here; parse-or-fail below -Request: {intent.get('primaryGoal', 'Unknown')} -Data Type: {intent.get('dataType', 'unknown')} -Format: {intent.get('expectedFormat', 'unknown')} -Criteria: {intent.get('successCriteria', [])} - -Content: {json.dumps(documentContents, indent=2)} - -RESPOND WITH THIS EXACT JSON FORMAT - NO OTHER TEXT: - -{{ - "overallSuccess": false, - "qualityScore": 0.3, - "dataTypeMatch": false, - "formatMatch": false, - "successCriteriaMet": [false, false], - "gapAnalysis": "Content does not match expected format and lacks required elements", - "improvementSuggestions": ["NEXT STEP: Create proper content in expected format", "NEXT STEP: Ensure all success criteria are met"], - "validationDetails": [ - {{ - "documentName": "Content Validation", - "issues": ["Format mismatch", "Missing required elements"], - "suggestions": ["NEXT STEP: Fix format", "NEXT STEP: Add missing elements"] - }} - ] -}} -""" - response = await self.services.ai.callAi( - prompt=explicitPrompt, - documents=None, - options=request_options - ) - if not response or not response.strip(): logger.warning("AI validation returned empty response") - return self._createFailedValidationResult("AI validation failed - empty response") - + raise ValueError("AI validation failed - empty response") + # Clean and extract JSON from response result = response.strip() logger.debug(f"AI validation response length: {len(result)}") - + # Try to find JSON in the response with multiple strategies import re - + # Strategy 1: Look for JSON in markdown code blocks json_match = re.search(r'```(?:json)?\s*(\{.*?\})\s*```', result, re.DOTALL) if json_match: @@ -231,23 +154,15 @@ RESPOND WITH THIS EXACT JSON FORMAT - NO OTHER TEXT: if not json_match: # Strategy 3: Look for any JSON object json_match = re.search(r'\{.*\}', result, re.DOTALL) - + if json_match: result = json_match.group(0) logger.debug(f"Extracted JSON directly: {result[:200]}...") else: - logger.debug(f"No JSON found in AI response, trying fallback extraction: {result[:200]}...") + logger.debug(f"No JSON found in AI response: {result[:200]}...") logger.debug(f"Full AI response: {result}") - - # Try fallback extraction for text responses - fallback_result = self._extractFallbackValidationResult(result) - if fallback_result: - logger.info("Using fallback text extraction for validation") - return fallback_result - - logger.warning("All AI validation attempts failed - no JSON found and fallback extraction failed") - return self._createFailedValidationResult("AI validation failed - no JSON in response") - + raise ValueError("AI validation failed - no JSON in response") + try: aiResult = json.loads(result) logger.info("AI validation JSON parsed successfully") @@ -259,7 +174,7 @@ RESPOND WITH THIS EXACT JSON FORMAT - NO OTHER TEXT: criteria = aiResult.get("successCriteriaMet") improvements = aiResult.get("improvementSuggestions", []) - # Normalize into schema-stable object without forcing failure defaults + # Normalize while keeping failures explicit normalized = { "overallSuccess": overall if isinstance(overall, bool) else None, "qualityScore": float(quality) if isinstance(quality, (int, float)) else None, @@ -278,26 +193,18 @@ RESPOND WITH THIS EXACT JSON FORMAT - NO OTHER TEXT: normalized["missingFields"].append("overallSuccess") if normalized["qualityScore"] is None: normalized["missingFields"].append("qualityScore") - # If any critical field missing, mark as not fully compliant if normalized["missingFields"]: normalized["schemaCompliant"] = False return normalized - + except json.JSONDecodeError as json_error: - logger.warning(f"All AI validation attempts failed - invalid JSON: {str(json_error)}") + logger.warning(f"AI validation invalid JSON: {str(json_error)}") logger.debug(f"JSON content: {result}") - - # Try to extract key information from malformed response - fallbackResult = self._extractFallbackValidationResult(result) - if fallbackResult: - logger.info("Using fallback validation result from malformed JSON") - return fallbackResult - - return self._createFailedValidationResult(f"AI validation failed - invalid JSON: {str(json_error)}") - - return self._createFailedValidationResult("AI validation failed - no response") - + raise + + raise ValueError("AI validation failed - no response") + except Exception as e: logger.error(f"AI validation failed: {str(e)}") - return self._createFailedValidationResult(f"AI validation error: {str(e)}") \ No newline at end of file + raise \ No newline at end of file diff --git a/modules/workflows/processing/adaptive/intentAnalyzer.py b/modules/workflows/processing/adaptive/intentAnalyzer.py index e7f10cab..f5fd5616 100644 --- a/modules/workflows/processing/adaptive/intentAnalyzer.py +++ b/modules/workflows/processing/adaptive/intentAnalyzer.py @@ -14,19 +14,11 @@ class IntentAnalyzer: self.services = services async def analyzeUserIntent(self, userPrompt: str, context: Any) -> Dict[str, Any]: - """Analyzes user intent from prompt and context using AI""" - try: - # Use AI to analyze intent - aiAnalysis = await self._analyzeIntentWithAI(userPrompt, context) - if aiAnalysis: - return aiAnalysis - - # Fallback to basic analysis if AI fails - return self._createBasicIntentAnalysis(userPrompt) - - except Exception as e: - logger.error(f"Error analyzing user intent: {str(e)}") - return self._createDefaultIntentAnalysis(userPrompt) + """Analyzes user intent from prompt and context using AI (single attempt, no fallbacks)""" + aiAnalysis = await self._analyzeIntentWithAI(userPrompt, context) + if not aiAnalysis: + raise ValueError("AI intent analysis failed: empty or invalid response") + return aiAnalysis async def _analyzeIntentWithAI(self, userPrompt: str, context: Any) -> Dict[str, Any]: """Uses AI to analyze user intent - language-agnostic""" @@ -68,26 +60,19 @@ CRITICAL: Respond with ONLY the JSON object below. Do not include any explanator from modules.datamodels.datamodelAi import AiCallOptions, OperationType request_options = AiCallOptions() request_options.operationType = OperationType.GENERAL + # Write prompt to debug + from modules.shared.debugLogger import writeDebugFile + writeDebugFile(analysisPrompt, "intent_prompt") response = await self.services.ai.callAi( prompt=analysisPrompt, documents=None, options=request_options ) + # Write response to debug + writeDebugFile(response or '', "intent_response") - # If first attempt fails, try with more explicit prompt - if response and not self._isValidJsonResponse(response): - logger.debug("First AI intent analysis attempt failed, retrying with explicit JSON-only prompt") - explicitPrompt = f""" -{analysisPrompt} - -IMPORTANT: You must respond with ONLY valid JSON. No explanations, no analysis, no text before or after. Just the JSON object. -""" - response = await self.services.ai.callAi( - prompt=explicitPrompt, - documents=None, - options=request_options - ) + # No retries or correction prompts here; parse-or-fail below if not response or not response.strip(): logger.warning("AI intent analysis returned empty response") @@ -113,7 +98,7 @@ IMPORTANT: You must respond with ONLY valid JSON. No explanations, no analysis, json_match = re.search(r'\{.*\}', result, re.DOTALL) if not json_match: - logger.warning(f"All AI intent analysis attempts failed - no JSON found in response: {result[:200]}...") + logger.warning(f"AI intent analysis failed - no JSON found in response: {result[:200]}...") logger.debug(f"Full AI response: {result}") return None @@ -126,7 +111,7 @@ IMPORTANT: You must respond with ONLY valid JSON. No explanations, no analysis, return aiResult except json.JSONDecodeError as json_error: - logger.warning(f"All AI intent analysis attempts failed - invalid JSON: {str(json_error)}") + logger.warning(f"AI intent analysis invalid JSON: {str(json_error)}") logger.debug(f"JSON content: {result}") return None diff --git a/modules/workflows/processing/adaptive/learningEngine.py b/modules/workflows/processing/adaptive/learningEngine.py index 2d5836a6..f18d1d17 100644 --- a/modules/workflows/processing/adaptive/learningEngine.py +++ b/modules/workflows/processing/adaptive/learningEngine.py @@ -29,8 +29,21 @@ class LearningEngine: # Update strategies based on feedback self._updateStrategies(feedback, intent) - logger.info(f"Learning from feedback: {feedback.get('actionAttempted', 'unknown')} - " - f"Quality: {feedback.get('qualityScore', 0):.2f}, Intent Match: {feedback.get('intentMatchScore', 0):.2f}") + # Normalize scores for safe logging + _qs = feedback.get('qualityScore', 0.0) + _im = feedback.get('intentMatchScore', 0.0) + try: + _qs = float(0.0 if _qs is None else _qs) + except Exception: + _qs = 0.0 + try: + _im = float(0.0 if _im is None else _im) + except Exception: + _im = 0.0 + logger.info( + f"Learning from feedback: {feedback.get('actionAttempted', 'unknown')} - " + f"Quality: {_qs:.2f}, Intent Match: {_im:.2f}" + ) except Exception as e: logger.error(f"Error learning from feedback: {str(e)}") @@ -61,8 +74,17 @@ class LearningEngine: """Updates strategies based on feedback""" strategyKey = self._getStrategyKey(intent) actionAttempted = feedback.get('actionAttempted', 'unknown') - qualityScore = feedback.get('qualityScore', 0) - intentMatchScore = feedback.get('intentMatchScore', 0) + # Coerce possibly None or non-numeric to floats + qs_raw = feedback.get('qualityScore', 0.0) + im_raw = feedback.get('intentMatchScore', 0.0) + try: + qualityScore = float(0.0 if qs_raw is None else qs_raw) + except Exception: + qualityScore = 0.0 + try: + intentMatchScore = float(0.0 if im_raw is None else im_raw) + except Exception: + intentMatchScore = 0.0 # Get or create strategy if strategyKey not in self.strategies: diff --git a/modules/workflows/processing/core/actionExecutor.py b/modules/workflows/processing/core/actionExecutor.py index 95a41606..1060e954 100644 --- a/modules/workflows/processing/core/actionExecutor.py +++ b/modules/workflows/processing/core/actionExecutor.py @@ -156,10 +156,9 @@ class ActionExecutor: action.setError(result.error or "Action execution failed") logger.error(f"Action failed: {result.error}") - # Create database log entry for action failure - self.services.interfaceDbChat.createLog({ - "workflowId": workflow.id, - "message": f"❌ **Task {taskNum}**\n\n❌ **Action {actionNum}/{totalActions}** failed: {result.error}", + # Create database log entry for action failure (write-through + bind) + self.services.workflow.storeLog(workflow, { + "message": f"❌ **Task {taskNum}**❌ **Action {actionNum}/{totalActions}** failed: {result.error}", "type": "error" }) @@ -227,7 +226,7 @@ class ActionExecutor: logger.error(f"Error creating action completion message: {str(e)}") def _writeTraceLog(self, contextText: str, data: Any) -> None: - """Write trace data to configured trace file if in debug mode with improved JSON formatting""" + """Write trace data and categorized React debug files when in DEBUG level""" try: import os import json @@ -247,8 +246,41 @@ class ActionExecutor: # Ensure log directory exists os.makedirs(logDir, exist_ok=True) - # Create trace file path + # Create trace file path (aggregate) traceFile = os.path.join(logDir, "log_trace.log") + + # Derive a React-category filename based on context + def _reactFileForContext(text: str) -> str: + t = (text or "").lower() + if "action result" in t: + return "react_action_results.jsonl" + if "extraction" in t: + if "prompt" in t: + return "react_extraction_prompts.jsonl" + if "response" in t: + return "react_extraction_responses.jsonl" + if "generation" in t: + if "prompt" in t: + return "react_generation_prompts.jsonl" + if "response" in t: + return "react_generation_responses.jsonl" + if "render" in t: + if "prompt" in t: + return "react_rendering_prompts.jsonl" + if "response" in t: + return "react_rendering_responses.jsonl" + if "validation" in t: + if "prompt" in t: + return "react_validation_prompts.jsonl" + if "response" in t: + return "react_validation_responses.jsonl" + return "react_misc.jsonl" + + # Daily suffix for React files + dateSuffix = datetime.fromtimestamp(self.services.utils.getUtcTimestamp(), UTC).strftime("%Y%m%d") + baseReactFile = _reactFileForContext(contextText) + name, ext = os.path.splitext(baseReactFile) + reactFile = os.path.join(logDir, f"{name}_{dateSuffix}{ext}") # Format the trace entry with better structure timestamp = datetime.fromtimestamp(self.services.utils.getUtcTimestamp(), UTC).strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] @@ -296,6 +328,26 @@ class ActionExecutor: # Write to trace file with open(traceFile, "a", encoding="utf-8") as f: f.write(traceEntry) + + # Also write a compact JSONL record to the categorized React file + try: + workflowId = getattr(getattr(self, 'services', None), 'currentWorkflow', None) + if hasattr(workflowId, 'id'): + workflowId = workflowId.id + # We have action context within executor only sometimes; include when accessible + reactRecord = { + "timestamp": timestamp, + "context": contextText, + "workflowId": workflowId, + "round": getattr(getattr(self, 'workflow', None), 'currentRound', None) if hasattr(self, 'workflow') else None, + "task": getattr(getattr(self, 'workflow', None), 'currentTask', None) if hasattr(self, 'workflow') else None, + "action": getattr(getattr(self, 'workflow', None), 'currentAction', None) if hasattr(self, 'workflow') else None, + "data": data + } + with open(reactFile, "a", encoding="utf-8") as rf: + rf.write(json.dumps(reactRecord, ensure_ascii=False, default=str) + "\n") + except Exception: + pass except Exception as e: # Don't log trace errors to avoid recursion diff --git a/modules/workflows/processing/core/messageCreator.py b/modules/workflows/processing/core/messageCreator.py index 6b9422d1..1bf75fb7 100644 --- a/modules/workflows/processing/core/messageCreator.py +++ b/modules/workflows/processing/core/messageCreator.py @@ -67,10 +67,8 @@ class MessageCreator: "taskProgress": "pending" } - message = self.services.interfaceDbChat.createMessage(messageData) - if message: - workflow.messages.append(message) - logger.info("Task plan message created successfully") + message = self.services.workflow.storeMessageWithDocuments(workflow, messageData, []) + logger.info("Task plan message created successfully") except Exception as e: logger.error(f"Error creating task plan message: {str(e)}") @@ -103,10 +101,8 @@ class MessageCreator: if taskStep.userMessage: taskStartMessage["message"] += f"\n\n💬 {taskStep.userMessage}" - message = self.services.interfaceDbChat.createMessage(taskStartMessage) - if message: - workflow.messages.append(message) - logger.info(f"Task start message created for task {taskIndex}") + message = self.services.workflow.storeMessageWithDocuments(workflow, taskStartMessage, []) + logger.info(f"Task start message created for task {taskIndex}") except Exception as e: logger.error(f"Error creating task start message: {str(e)}") @@ -186,14 +182,9 @@ class MessageCreator: logger.info(f"Creating ERROR message: {messageText}") logger.info(f"Message data: {messageData}") - message = self.services.interfaceDbChat.createMessage(messageData) - if message: - workflow.messages.append(message) - logger.info(f"Message created: {action.execMethod}.{action.execAction}") - return message - else: - logger.error(f"Failed to create workflow message for action {action.execMethod}.{action.execAction}") - return None + message = self.services.workflow.storeMessageWithDocuments(workflow, messageData, createdDocuments) + logger.info(f"Message created: {action.execMethod}.{action.execAction}") + return message except Exception as e: logger.error(f"Error creating action message: {str(e)}") return None @@ -236,10 +227,8 @@ class MessageCreator: "taskProgress": "success" } - message = self.services.interfaceDbChat.createMessage(taskCompletionMessage) - if message: - workflow.messages.append(message) - logger.info(f"Task completion message created for task {taskIndex}") + message = self.services.workflow.storeMessageWithDocuments(workflow, taskCompletionMessage, []) + logger.info(f"Task completion message created for task {taskIndex}") except Exception as e: logger.error(f"Error creating task completion message: {str(e)}") @@ -265,10 +254,8 @@ class MessageCreator: "taskProgress": "retry" } - message = self.services.interfaceDbChat.createMessage(retryMessage) - if message: - workflow.messages.append(message) - logger.info(f"Retry message created for task {taskIndex}") + message = self.services.workflow.storeMessageWithDocuments(workflow, retryMessage, []) + logger.info(f"Retry message created for task {taskIndex}") except Exception as e: logger.error(f"Error creating retry message: {str(e)}") @@ -306,10 +293,8 @@ class MessageCreator: "taskProgress": "fail" } - message = self.services.interfaceDbChat.createMessage(messageData) - if message: - workflow.messages.append(message) - logger.info(f"Error message created for task {taskIndex}") + message = self.services.workflow.storeMessageWithDocuments(workflow, messageData, []) + logger.info(f"Error message created for task {taskIndex}") except Exception as e: logger.error(f"Error creating error message: {str(e)}") diff --git a/modules/workflows/processing/core/taskPlanner.py b/modules/workflows/processing/core/taskPlanner.py index d9f52996..dc7c992f 100644 --- a/modules/workflows/processing/core/taskPlanner.py +++ b/modules/workflows/processing/core/taskPlanner.py @@ -9,6 +9,7 @@ from modules.datamodels.datamodelAi import AiCallOptions, OperationType, Process from modules.workflows.processing.shared.promptGenerationTaskplan import ( generateTaskPlanningPrompt ) +from modules.workflows.processing.adaptive import IntentAnalyzer logger = logging.getLogger(__name__) @@ -50,11 +51,16 @@ class TaskPlanner: # Check workflow status before calling AI service self._checkWorkflowStopped(workflow) - # Create proper context object for task planning + # Analyze user intent to obtain cleaned user objective for planning + intentAnalyzer = IntentAnalyzer(self.services) + intent = await intentAnalyzer.analyzeUserIntent(actualUserPrompt, None) + cleanedObjective = intent.get('primaryGoal', actualUserPrompt) if isinstance(intent, dict) else actualUserPrompt + + # Create proper context object for task planning using cleaned intent # For task planning, we need to create a minimal TaskStep since TaskContext requires it planningTaskStep = TaskStep( id="planning", - objective=actualUserPrompt, + objective=cleanedObjective, dependencies=[], success_criteria=[], estimated_complexity="medium" @@ -88,11 +94,9 @@ class TaskPlanner: taskPlanningPromptTemplate = bundle.prompt placeholders = bundle.placeholders - # Log task planning prompt sent to AI - logger.info("=== TASK PLANNING PROMPT SENT TO AI ===") - # Trace task planning prompt - self._writeTraceLog("Task Plan Prompt", taskPlanningPromptTemplate) - self._writeTraceLog("Task Plan Placeholders", placeholders) + # Write task planning prompt to debug + from modules.shared.debugLogger import writeDebugFile + writeDebugFile(taskPlanningPromptTemplate, "taskplan_prompt", placeholders) # Centralized AI call: Task planning (quality, detailed) with placeholders options = AiCallOptions( @@ -115,11 +119,8 @@ class TaskPlanner: if not prompt: raise ValueError("AI service returned no response for task planning") - # Log task planning response received - logger.info("=== TASK PLANNING AI RESPONSE RECEIVED ===") - logger.info(f"Response length: {len(prompt) if prompt else 0}") - # Trace task planning response - self._writeTraceLog("Task Plan Response", prompt) + # Write task planning response to debug + writeDebugFile(prompt or '', "taskplan_response") # Parse task plan response try: @@ -258,76 +259,5 @@ class TaskPlanner: return False def _writeTraceLog(self, contextText: str, data: Any) -> None: - """Write trace data to configured trace file if in debug mode with improved JSON formatting""" - try: - import os - import json - from datetime import datetime, UTC - - # Only write if logger is in debug mode - if logger.level > logging.DEBUG: - return - - # Get log directory from configuration - logDir = self.services.utils.configGet("APP_LOGGING_LOG_DIR", "./") - if not os.path.isabs(logDir): - # If relative path, make it relative to the gateway directory - gatewayDir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) - logDir = os.path.join(gatewayDir, logDir) - - # Ensure log directory exists - os.makedirs(logDir, exist_ok=True) - - # Create trace file path - traceFile = os.path.join(logDir, "log_trace.log") - - # Format the trace entry with better structure - timestamp = datetime.fromtimestamp(self.services.utils.getUtcTimestamp(), UTC).strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] - - # Create a structured trace entry - traceEntry = f"[{timestamp}] {contextText}\n" - traceEntry += "=" * 80 + "\n" - - # Add data if provided with improved formatting - if data is not None: - try: - if isinstance(data, (dict, list)): - # Format as pretty JSON with better settings - jsonStr = json.dumps(data, indent=2, default=str, ensure_ascii=False, sort_keys=False) - traceEntry += f"JSON Data:\n{jsonStr}\n" - elif isinstance(data, str): - # For string data, try to parse as JSON first, then fall back to plain text - try: - parsed = json.loads(data) - jsonStr = json.dumps(parsed, indent=2, default=str, ensure_ascii=False, sort_keys=False) - traceEntry += f"JSON Data (parsed from string):\n{jsonStr}\n" - except (json.JSONDecodeError, TypeError): - # Not valid JSON, show as plain text with proper line breaks - formatted_data = data.replace('\\n', '\n') - traceEntry += f"Text Data:\n{formatted_data}\n" - else: - # For other types, convert to string and try to parse as JSON - dataStr = str(data) - try: - parsed = json.loads(dataStr) - jsonStr = json.dumps(parsed, indent=2, default=str, ensure_ascii=False, sort_keys=False) - traceEntry += f"JSON Data (parsed from object):\n{jsonStr}\n" - except (json.JSONDecodeError, TypeError): - # Not valid JSON, show as plain text with proper line breaks - formatted_data = dataStr.replace('\\n', '\n') - traceEntry += f"Object Data:\n{formatted_data}\n" - except Exception as e: - # Fallback to simple string representation - traceEntry += f"Data (fallback): {str(data)}\n" - else: - traceEntry += "No data provided\n" - - traceEntry += "=" * 80 + "\n\n" - - # Write to trace file - with open(traceFile, "a", encoding="utf-8") as f: - f.write(traceEntry) - - except Exception as e: - # Don't log trace errors to avoid recursion - pass + """Disabled extra trace file outputs (per chat debug simplification).""" + return diff --git a/modules/workflows/processing/modes/modeActionplan.py b/modules/workflows/processing/modes/modeActionplan.py index fdee8ff7..c1521a71 100644 --- a/modules/workflows/processing/modes/modeActionplan.py +++ b/modules/workflows/processing/modes/modeActionplan.py @@ -301,10 +301,8 @@ class ActionplanMode(BaseMode): if action.userMessage: actionStartMessage["message"] += f"\n\n💬 {action.userMessage}" - message = self.services.interfaceDbChat.createMessage(actionStartMessage) - if message: - workflow.messages.append(message) - logger.info(f"Action start message created for action {actionNumber}") + self.services.workflow.storeMessageWithDocuments(workflow, actionStartMessage, []) + logger.info(f"Action start message created for action {actionNumber}") # Execute single action result = await self.actionExecutor.executeSingleAction(action, workflow, taskStep, diff --git a/modules/workflows/processing/modes/modeReact.py b/modules/workflows/processing/modes/modeReact.py index 86c25172..0503e56a 100644 --- a/modules/workflows/processing/modes/modeReact.py +++ b/modules/workflows/processing/modes/modeReact.py @@ -20,8 +20,10 @@ from modules.workflows.processing.shared.promptGenerationActionsReact import ( generateReactParametersPrompt, generateReactRefinementPrompt ) +from modules.shared.debugLogger import writeDebugFile from modules.workflows.processing.shared.placeholderFactory import extractReviewContent from modules.workflows.processing.adaptive import IntentAnalyzer, ContentValidator, LearningEngine, ProgressTracker +from modules.workflows.processing.adaptive.adaptiveLearningEngine import AdaptiveLearningEngine logger = logging.getLogger(__name__) @@ -32,8 +34,9 @@ class ReactMode(BaseMode): super().__init__(services, workflow) # Initialize adaptive components self.intentAnalyzer = IntentAnalyzer(services) - self.contentValidator = ContentValidator(services) self.learningEngine = LearningEngine() + self.adaptiveLearningEngine = AdaptiveLearningEngine() # New enhanced learning engine + self.contentValidator = ContentValidator(services, self.adaptiveLearningEngine) self.progressTracker = ProgressTracker() self.currentIntent = None # Placeholder service no longer used; prompts are generated directly @@ -109,6 +112,20 @@ class ReactMode(BaseMode): quality_score = 0.0 logger.info(f"Content validation: {validationResult['overallSuccess']} (quality: {quality_score:.2f})") + # NEW: Record validation result for adaptive learning + actionContext = { + 'actionType': selection.get('action', {}).get('action', 'unknown'), + 'actionName': selection.get('action', {}).get('action', 'unknown'), + 'workflowId': context.workflow_id + } + + self.adaptiveLearningEngine.recordValidationResult( + validationResult, + actionContext, + context.workflow_id, + step + ) + # NEW: Learn from feedback feedback = self._collectFeedback(result, validationResult, self.workflowIntent) self.learningEngine.learnFromFeedback(feedback, context, self.workflowIntent) @@ -132,8 +149,7 @@ class ReactMode(BaseMode): # Telemetry: simple duration per step duration = time.time() - t0 - self.services.interfaceDbChat.createLog({ - "workflowId": workflow.id, + self.services.workflow.storeLog(workflow, { "message": f"react_step_duration_sec={duration:.3f}", "type": "info" }) @@ -177,12 +193,13 @@ class ReactMode(BaseMode): async def _planSelect(self, context: TaskContext) -> Dict[str, Any]: """Plan: select exactly one action. Returns {"action": {method, name}}""" - bundle = generateReactPlanSelectionPrompt(self.services, context) + bundle = generateReactPlanSelectionPrompt(self.services, context, self.adaptiveLearningEngine) promptTemplate = bundle.prompt placeholders = bundle.placeholders - self._writeTraceLog("React Plan Selection Prompt", promptTemplate) - self._writeTraceLog("React Plan Selection Placeholders", placeholders) + # Write action selection prompt to debug + from modules.shared.debugLogger import writeDebugFile + writeDebugFile(promptTemplate, "action_selection_prompt", placeholders) # Centralized AI call for plan selection (use plan generation quality) options = AiCallOptions( @@ -200,7 +217,8 @@ class ReactMode(BaseMode): placeholders=placeholders, options=options ) - self._writeTraceLog("React Plan Selection Response", response) + # Write action selection response to debug + writeDebugFile(response or '', "action_selection_response") jsonStart = response.find('{') if response else -1 jsonEnd = response.rfind('}') + 1 if response else 0 if jsonStart == -1 or jsonEnd == 0: @@ -290,12 +308,12 @@ class ReactMode(BaseMode): stage2Context.learnings = [] # Build and send the Stage 2 parameters prompt (always) - bundle = generateReactParametersPrompt(self.services, stage2Context, compoundActionName) + bundle = generateReactParametersPrompt(self.services, stage2Context, compoundActionName, self.adaptiveLearningEngine) promptTemplate = bundle.prompt placeholders = bundle.placeholders - self._writeTraceLog("React Parameters Prompt", promptTemplate) - self._writeTraceLog("React Parameters Placeholders", placeholders) + # Write parameters prompt to debug + writeDebugFile(promptTemplate, "parameters_prompt", placeholders) # Centralized AI call for parameter suggestion (balanced analysis) options = AiCallOptions( @@ -324,7 +342,9 @@ class ReactMode(BaseMode): except Exception as e: logger.error(f"Failed to parse AI parameters response as JSON: {str(e)}") logger.error(f"Response was: {paramsResp}") - parameters = {} + raise ValueError("AI parameters response invalid JSON") + if not isinstance(parameters, dict): + raise ValueError("AI parameters response missing 'parameters' object") # Merge Stage 1 resource selections into Stage 2 parameters (only if action expects them) try: @@ -353,15 +373,12 @@ class ReactMode(BaseMode): if 'language' not in parameters and hasattr(self.services, 'user') and getattr(self.services.user, 'language', None): parameters['language'] = self.services.user.language - # Write merged parameters to trace BEFORE continuing - try: - mergedParamObj = { - "schema": (paramObj.get('schema') if isinstance(paramObj, dict) else 'parameters_v1'), - "parameters": parameters - } - self._writeTraceLog("React Parameters Response", mergedParamObj) - except Exception: - pass + # Write parameters response to debug + mergedParamObj = { + "schema": (paramObj.get('schema') if isinstance(paramObj, dict) else 'parameters_v1'), + "parameters": parameters + } + writeDebugFile(str(mergedParamObj), "parameters_response", mergedParamObj) # Build a synthetic ActionItem for execution routing and labels currentRound = getattr(self.workflow, 'currentRound', 0) @@ -614,8 +631,8 @@ class ReactMode(BaseMode): promptTemplate = bundle.prompt placeholders = bundle.placeholders - self._writeTraceLog("React Refinement Prompt", promptTemplate) - self._writeTraceLog("React Refinement Placeholders", placeholders) + # Write refinement/validation prompt to debug + writeDebugFile(promptTemplate, "validation_refinement_prompt", placeholders) # Centralized AI call for refinement decision (balanced analysis) options = AiCallOptions( @@ -633,7 +650,8 @@ class ReactMode(BaseMode): placeholders=placeholders, options=options ) - self._writeTraceLog("React Refinement Response", resp) + # Write refinement/validation response to debug + writeDebugFile(resp or '', "validation_refinement_response") js = resp[resp.find('{'):resp.rfind('}')+1] if resp else '{}' try: decision = json.loads(js) @@ -688,9 +706,7 @@ class ReactMode(BaseMode): "actionProgress": actionProgress } - message = self.services.interfaceDbChat.createMessage(messageData) - if message: - workflow.messages.append(message) + self.services.workflow.storeMessageWithDocuments(workflow, messageData, []) except Exception as e: logger.error(f"Error creating React action message: {str(e)}") @@ -909,7 +925,7 @@ Return only the user-friendly message, no technical details.""" return None def _writeTraceLog(self, contextText: str, data: Any) -> None: - """Write trace data to configured trace file if in debug mode with improved JSON formatting""" + """Write trace data and categorized React debug files when in DEBUG level""" try: import os import json @@ -929,8 +945,63 @@ Return only the user-friendly message, no technical details.""" # Ensure log directory exists os.makedirs(logDir, exist_ok=True) - # Create trace file path + # Create trace file path (aggregate) traceFile = os.path.join(logDir, "log_trace.log") + + # Derive a React-category filename based on context + def _reactFileForContext(text: str) -> str: + t = (text or "").lower() + if "task plan" in t: + if "prompt" in t: + return "react_taskplan_prompt.jsonl" + if "response" in t: + return "react_taskplan_response.jsonl" + if "plan selection" in t: + if "prompt" in t: + return "react_action_selection_prompt.jsonl" + if "response" in t: + return "react_action_selection_response.jsonl" + if "parameters" in t: + if "prompt" in t: + return "react_parameter_setting_prompt.jsonl" + if "response" in t: + return "react_parameter_setting_response.jsonl" + if "refinement" in t or "review" in t or "decide" in t: + # Treat refinement as validation/next-step decision context + if "prompt" in t: + return "react_validation_prompt.jsonl" + if "response" in t: + return "react_next_step_decisions.jsonl" + if "extraction" in t: + if "prompt" in t: + return "react_extraction_prompts.jsonl" + if "response" in t: + return "react_extraction_responses.jsonl" + if "generation" in t: + if "prompt" in t: + return "react_generation_prompts.jsonl" + if "response" in t: + return "react_generation_responses.jsonl" + if "render" in t: + if "prompt" in t: + return "react_rendering_prompts.jsonl" + if "response" in t: + return "react_rendering_responses.jsonl" + if "validation" in t: + if "prompt" in t: + return "react_validation_prompts.jsonl" + if "response" in t: + return "react_validation_responses.jsonl" + if "action result" in t: + return "react_action_results.jsonl" + # Fallback + return "react_misc.jsonl" + + # Daily suffix for React files + dateSuffix = datetime.fromtimestamp(self.services.utils.getUtcTimestamp(), UTC).strftime("%Y%m%d") + baseReactFile = _reactFileForContext(contextText) + name, ext = os.path.splitext(baseReactFile) + reactFile = os.path.join(logDir, f"{name}_{dateSuffix}{ext}") # Format the trace entry with better structure timestamp = datetime.fromtimestamp(self.services.utils.getUtcTimestamp(), UTC).strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] @@ -978,6 +1049,28 @@ Return only the user-friendly message, no technical details.""" # Write to trace file with open(traceFile, "a", encoding="utf-8") as f: f.write(traceEntry) + + # Also write a compact JSONL record to the categorized React file + try: + # Add workflow and step context if available + workflowId = getattr(getattr(self, 'workflow', None), 'id', None) + roundNumber = getattr(getattr(self, 'workflow', None), 'currentRound', None) + taskNumber = getattr(getattr(self, 'workflow', None), 'currentTask', None) + actionNumber = getattr(getattr(self, 'workflow', None), 'currentAction', None) + + reactRecord = { + "timestamp": timestamp, + "context": contextText, + "workflowId": workflowId, + "round": roundNumber, + "task": taskNumber, + "action": actionNumber, + "data": data + } + with open(reactFile, "a", encoding="utf-8") as rf: + rf.write(json.dumps(reactRecord, ensure_ascii=False, default=str) + "\n") + except Exception: + pass except Exception as e: # Don't log trace errors to avoid recursion diff --git a/modules/workflows/processing/shared/promptGenerationActionsReact.py b/modules/workflows/processing/shared/promptGenerationActionsReact.py index 10b7f9f6..56f6aaa5 100644 --- a/modules/workflows/processing/shared/promptGenerationActionsReact.py +++ b/modules/workflows/processing/shared/promptGenerationActionsReact.py @@ -3,6 +3,7 @@ React Mode Prompt Generation Handles prompt templates for react mode action handling. """ +import json from typing import Any, List from modules.datamodels.datamodelChat import PromptBundle, PromptPlaceholder from modules.workflows.processing.shared.placeholderFactory import ( @@ -19,7 +20,7 @@ from modules.workflows.processing.shared.placeholderFactory import ( ) from modules.workflows.processing.shared.methodDiscovery import methods, getActionParameterList -def generateReactPlanSelectionPrompt(services, context: Any) -> PromptBundle: +def generateReactPlanSelectionPrompt(services, context: Any, learningEngine=None) -> PromptBundle: """Define placeholders first, then the template; return PromptBundle.""" placeholders: List[PromptPlaceholder] = [ PromptPlaceholder(label="USER_PROMPT", content=extractUserPrompt(context), summaryAllowed=False), @@ -31,6 +32,21 @@ def generateReactPlanSelectionPrompt(services, context: Any) -> PromptBundle: PromptPlaceholder(label="AVAILABLE_DOCUMENTS_INDEX", content=extractAvailableDocumentsIndex(services, context), summaryAllowed=True), PromptPlaceholder(label="AVAILABLE_CONNECTIONS_INDEX", content=extractAvailableConnectionsIndex(services), summaryAllowed=False), ] + + # Add adaptive learning context if available + adaptiveContext = {} + if learningEngine: + workflowId = getattr(context, 'workflow_id', 'unknown') + userPrompt = extractUserPrompt(context) + adaptiveContext = learningEngine.getAdaptiveContextForActionSelection(workflowId, userPrompt) + + if adaptiveContext: + # Add learning-aware placeholders + placeholders.extend([ + PromptPlaceholder(label="ADAPTIVE_GUIDANCE", content=adaptiveContext.get('adaptiveGuidance', ''), summaryAllowed=True), + PromptPlaceholder(label="FAILURE_ANALYSIS", content=json.dumps(adaptiveContext.get('failureAnalysis', {}), indent=2), summaryAllowed=True), + PromptPlaceholder(label="ESCALATION_LEVEL", content=adaptiveContext.get('escalationLevel', 'low'), summaryAllowed=False), + ]) template = """Select exactly one next action to advance the task incrementally. @@ -52,11 +68,26 @@ AVAILABLE_DOCUMENTS_INDEX: AVAILABLE_CONNECTIONS_INDEX: {{KEY:AVAILABLE_CONNECTIONS_INDEX}} +{{#if ADAPTIVE_GUIDANCE}} +LEARNING-BASED GUIDANCE: +{{KEY:ADAPTIVE_GUIDANCE}} + +{{#if FAILURE_ANALYSIS}} +FAILURE ANALYSIS: +{{KEY:FAILURE_ANALYSIS}} +{{/if}} + +ESCALATION LEVEL: {{KEY:ESCALATION_LEVEL}} +{{/if}} + REPLY: Return ONLY a JSON object with the following structure (no comments, no extra text). The chosen action MUST: - be the next logical incremental step toward fulfilling the objective - not attempt to complete the entire objective in one step - if producing files, target exactly one output format for this step - reference ONLY existing document IDs/labels from AVAILABLE_DOCUMENTS_INDEX +{{#if ADAPTIVE_GUIDANCE}} +- learn from previous validation feedback and avoid repeated mistakes +{{/if}} {{ "action": "method.action_name", "actionObjective": "...", @@ -81,11 +112,15 @@ RULES: - Copy references EXACTLY as shown in AVAILABLE_DOCUMENTS_INDEX 6. For requiredConnection, use ONLY an exact label from AVAILABLE_CONNECTIONS_INDEX 7. Plan incrementally: if the overall intent needs multiple output formats (e.g., CSV and HTML), choose one format in this step and leave the other(s) for subsequent steps +{{#if ADAPTIVE_GUIDANCE}} +8. CRITICAL: Learn from previous validation feedback - avoid repeating the same mistakes +9. If previous attempts failed, consider alternative approaches or more specific parameters +{{/if}} """ return PromptBundle(prompt=template, placeholders=placeholders) -def generateReactParametersPrompt(services, context: Any, compoundActionName: str) -> PromptBundle: +def generateReactParametersPrompt(services, context: Any, compoundActionName: str, learningEngine=None) -> PromptBundle: """Define placeholders first, then the template; return PromptBundle. Minimal Stage 2 (no fallback): consumes actionObjective, selectedAction, parametersContext only. @@ -166,6 +201,19 @@ Excludes documents/connections/history entirely. PromptPlaceholder(label="ACTION_PARAMETERS", content=actionParametersText, summaryAllowed=False), PromptPlaceholder(label="LEARNINGS", content=learningsText, summaryAllowed=True), ] + + # Add adaptive learning context if available + adaptiveContext = {} + if learningEngine: + workflowId = getattr(context, 'workflow_id', 'unknown') + adaptiveContext = learningEngine.getAdaptiveContextForParameters(workflowId, compoundActionName, parametersContext or "") + + if adaptiveContext: + placeholders.extend([ + PromptPlaceholder(label="PARAMETER_GUIDANCE", content=adaptiveContext.get('parameterGuidance', ''), summaryAllowed=True), + PromptPlaceholder(label="ATTEMPT_NUMBER", content=str(adaptiveContext.get('attemptNumber', 1)), summaryAllowed=False), + PromptPlaceholder(label="FAILURE_ANALYSIS", content=json.dumps(adaptiveContext.get('failureAnalysis', {}), indent=2), summaryAllowed=True), + ]) template = """You are a parameter generator. Set the parameters for this specific action. @@ -177,6 +225,19 @@ CONTEXT AND OBJECTIVE: SELECTED_ACTION: {{KEY:SELECTED_ACTION}} +{{#if PARAMETER_GUIDANCE}} +LEARNING-BASED PARAMETER GUIDANCE: +{{KEY:PARAMETER_GUIDANCE}} + +{{#if ATTEMPT_NUMBER}} +ATTEMPT NUMBER: {{KEY:ATTEMPT_NUMBER}} +{{/if}} + +{{#if FAILURE_ANALYSIS}} +PREVIOUS FAILURE ANALYSIS: +{{KEY:FAILURE_ANALYSIS}} +{{/if}} +{{/if}} REPLY (ONLY JSON): {{ @@ -203,12 +264,19 @@ INSTRUCTIONS: - Fill in appropriate values based on the context and objective - Do NOT invent new parameters - Do NOT include: documentList, connectionReference, history, documents, connections +{{#if PARAMETER_GUIDANCE}} +- CRITICAL: Follow the learning-based parameter guidance above +- Learn from previous validation failures and adjust parameters accordingly +{{/if}} RULES: - Return ONLY JSON (no markdown, no prose) - Use ONLY the exact parameter names listed in REQUIRED PARAMETERS FOR THIS ACTION - Do NOT add any parameters not listed above - Do NOT add nested objects or custom fields +{{#if PARAMETER_GUIDANCE}} +- Apply learning insights to avoid repeated parameter mistakes +{{/if}} """ return PromptBundle(prompt=template, placeholders=placeholders) @@ -220,26 +288,23 @@ def generateReactRefinementPrompt(services, context: Any, reviewContent: str) -> PromptPlaceholder(label="REVIEW_CONTENT", content=reviewContent, summaryAllowed=True), ] - template = """Decide the next step based on the observation. + template = """TASK DECISION -OBJECTIVE: -{{KEY:USER_PROMPT}} +OBJECTIVE: '{{KEY:USER_PROMPT}}' -OBSERVATION: -{{KEY:REVIEW_CONTENT}} +DECISION RULES: +1. "continue" = objective NOT fulfilled +2. "stop" = objective fulfilled +3. Return ONLY JSON - no other text -REPLY: Return only a JSON object with your decision: +OUTPUT FORMAT (only JSON object to deliver): {{ -"decision": "continue|stop", -"reason": "brief explanation" + "decision": "continue", + "reason": "Brief reason for decision" }} -RULES: -1. Use "continue" if objective NOT fulfilled -2. Use "stop" if objective fulfilled -3. Return ONLY JSON - no other text -4. Do NOT use markdown code blocks -5. Do NOT add explanations +OBSERVATION: {{KEY:REVIEW_CONTENT}} + """ return PromptBundle(prompt=template, placeholders=placeholders) diff --git a/modules/workflows/workflowManager.py b/modules/workflows/workflowManager.py index 7b52ffe9..5c04ba0b 100644 --- a/modules/workflows/workflowManager.py +++ b/modules/workflows/workflowManager.py @@ -3,6 +3,7 @@ import logging from datetime import datetime, UTC import uuid import asyncio +import json from modules.datamodels.datamodelChat import ( UserInputRequest, @@ -38,7 +39,7 @@ class WorkflowManager: if not workflow: raise ValueError(f"Workflow {workflowId} not found") - # Store workflow in services for reference (don't overwrite the workflow service) + # Store workflow in services for reference self.services.currentWorkflow = workflow if workflow.status == "running": @@ -49,14 +50,12 @@ class WorkflowManager: "status": "stopped", "lastActivity": currentTime }) - self.services.workflow.createLog({ - "workflowId": workflowId, - "message": "Workflow stopped for new prompt", - "type": "info", - "status": "stopped", - "progress": 100 - }) - await asyncio.sleep(0.1) + self.services.workflow.storeLog(workflow, { + "message": "Workflow stopped for new prompt", + "type": "info", + "status": "stopped", + "progress": 100 + }) newRound = workflow.currentRound + 1 self.services.workflow.updateWorkflow(workflowId, { @@ -66,27 +65,26 @@ class WorkflowManager: "workflowMode": workflowMode # Update workflow mode for existing workflows }) - workflow = self.services.workflow.getWorkflow(workflowId) - if not workflow: - raise ValueError(f"Failed to reload workflow {workflowId} after update") + # Reflect updates on the in-memory object without reloading + workflow.status = "running" + workflow.lastActivity = currentTime + workflow.currentRound = newRound + workflow.workflowMode = workflowMode - self.services.workflow.createLog({ - "workflowId": workflowId, + self.services.workflow.storeLog(workflow, { "message": f"Workflow resumed (round {workflow.currentRound}) with mode: {workflowMode}", "type": "info", "status": "running", "progress": 0 }) - # CRITICAL: Update the workflow object's workflowMode attribute for immediate use - workflow.workflowMode = workflowMode else: workflowData = { "name": "New Workflow", "status": "running", "startedAt": currentTime, "lastActivity": currentTime, - "currentRound": 0, + "currentRound": 1, "currentTask": 0, "currentAction": 0, "totalTasks": 0, @@ -108,11 +106,8 @@ class WorkflowManager: workflow = self.services.workflow.createWorkflow(workflowData) logger.info(f"Created workflow with mode: {getattr(workflow, 'workflowMode', 'NOT_SET')}") logger.info(f"Workflow data passed: {workflowData.get('workflowMode', 'NOT_IN_DATA')}") - workflow.currentRound = 1 - self.services.workflow.updateWorkflow(workflow.id, {"currentRound": 1}) self.services.workflow.updateWorkflowStats(workflow.id, bytesSent=0, bytesReceived=0) - # Store workflow in services for reference (don't overwrite the workflow service) self.services.currentWorkflow = workflow # Start workflow processing asynchronously @@ -136,8 +131,7 @@ class WorkflowManager: "status": "stopped", "lastActivity": workflow.lastActivity }) - self.services.workflow.createLog({ - "workflowId": workflowId, + self.services.workflow.storeLog(workflow, { "message": "Workflow stopped", "type": "warning", "status": "stopped", @@ -199,159 +193,141 @@ class WorkflowManager: "taskProgress": "pending", "actionProgress": "pending" } + + # Clear trace log for new workflow session + self.workflowProcessor.clearTraceLog() - # Create message first to get messageId - message = self.services.workflow.createMessage(messageData) - if message: - workflow.messages.append(message) - - # Clear trace log for new workflow session - self.workflowProcessor.clearTraceLog() - - # Add documents if any, now with messageId - if userInput.listFileId: - # Process file IDs and add to message data - documents = await self._processFileIds(userInput.listFileId, message.id) - message.documents = documents - # Update the message with documents in database - self.services.workflow.updateMessage(message.id, {"documents": [doc.to_dict() for doc in documents]}) + # Analyze the user's input to detect language, normalize request, extract intent, and offload bulky context into documents + created_docs = [] - # Analyze the user's input to detect language, normalize request, extract intent, and offload bulky context into documents + try: + analyzerPrompt = ( + "You are an input analyzer. From the user's message, perform ALL of the following in one pass:\n" + "1) detectedLanguage: detect ISO 639-1 language code (e.g., de, en).\n" + "2) normalizedRequest: full, explicit restatement of the user's request in the detected language; do NOT summarize; preserve ALL constraints and details.\n" + "3) intent: concise single-paragraph core request in the detected language for high-level routing.\n" + "4) contextItems: supportive data blocks to attach as separate documents if significantly larger than the intent (large literal content, long lists/tables, code/JSON blocks, transcripts, CSV fragments, detailed specs). Keep URLs in the intent unless they embed large pasted content.\n\n" + "Rules:\n" + "- If total content (intent + data) is < 10% of model max tokens, do not extract; return empty contextItems and keep intent compact and self-contained.\n" + "- If content exceeds that threshold, move bulky parts into contextItems; keep intent short and clear.\n" + "- Preserve critical references (URLs, filenames) in intent.\n" + "- Normalize to the primary detected language if mixed-language.\n\n" + "Return ONLY JSON (no markdown) with this shape:\n" + "{\n" + " \"detectedLanguage\": \"de|en|fr|it|...\",\n" + " \"normalizedRequest\": \"Full explicit instruction in detected language\",\n" + " \"intent\": \"Concise normalized request...\",\n" + " \"contextItems\": [\n" + " {\n" + " \"title\": \"User context 1\",\n" + " \"mimeType\": \"text/plain\",\n" + " \"content\": \"Full extracted content block here\"\n" + " }\n" + " ]\n" + "}\n\n" + f"User message:\n{userInput.prompt}" + ) + + # Call AI analyzer + aiResponse = await self.services.ai.callAi(prompt=analyzerPrompt) + + detectedLanguage = None + normalizedRequest = None + intentText = userInput.prompt + contextItems = [] + + # Parse analyzer response (JSON expected) try: - analyzerPrompt = ( - "You are an input analyzer. From the user's message, perform ALL of the following in one pass:\n" - "1) detectedLanguage: detect ISO 639-1 language code (e.g., de, en).\n" - "2) normalizedRequest: full, explicit restatement of the user's request in the detected language; do NOT summarize; preserve ALL constraints and details.\n" - "3) intent: concise single-paragraph core request in the detected language for high-level routing.\n" - "4) contextItems: supportive data blocks to attach as separate documents if significantly larger than the intent (large literal content, long lists/tables, code/JSON blocks, transcripts, CSV fragments, detailed specs). Keep URLs in the intent unless they embed large pasted content.\n\n" - "Rules:\n" - "- If total content (intent + data) is < 10% of model max tokens, do not extract; return empty contextItems and keep intent compact and self-contained.\n" - "- If content exceeds that threshold, move bulky parts into contextItems; keep intent short and clear.\n" - "- Preserve critical references (URLs, filenames) in intent.\n" - "- Normalize to the primary detected language if mixed-language.\n\n" - "Return ONLY JSON (no markdown) with this shape:\n" - "{\n" - " \"detectedLanguage\": \"de|en|fr|it|...\",\n" - " \"normalizedRequest\": \"Full explicit instruction in detected language\",\n" - " \"intent\": \"Concise normalized request...\",\n" - " \"contextItems\": [\n" - " {\n" - " \"title\": \"User context 1\",\n" - " \"mimeType\": \"text/plain\",\n" - " \"content\": \"Full extracted content block here\"\n" - " }\n" - " ]\n" - "}\n\n" - f"User message:\n{userInput.prompt}" - ) - - # Call AI analyzer - aiResponse = await self.services.ai.callAi(prompt=analyzerPrompt) - - detectedLanguage = None - normalizedRequest = None - intentText = userInput.prompt + jsonStart = aiResponse.find('{') if aiResponse else -1 + jsonEnd = aiResponse.rfind('}') + 1 if aiResponse else 0 + if jsonStart != -1 and jsonEnd > jsonStart: + parsed = json.loads(aiResponse[jsonStart:jsonEnd]) + detectedLanguage = parsed.get('detectedLanguage') or None + normalizedRequest = parsed.get('normalizedRequest') or None + if parsed.get('intent'): + intentText = parsed.get('intent') + contextItems = parsed.get('contextItems') or [] + except Exception: contextItems = [] - # Parse analyzer response (JSON expected) + # Update services state + if detectedLanguage and isinstance(detectedLanguage, str): + self._setUserLanguage(detectedLanguage) try: - import json - jsonStart = aiResponse.find('{') if aiResponse else -1 - jsonEnd = aiResponse.rfind('}') + 1 if aiResponse else 0 - if jsonStart != -1 and jsonEnd > jsonStart: - parsed = json.loads(aiResponse[jsonStart:jsonEnd]) - detectedLanguage = parsed.get('detectedLanguage') or None - normalizedRequest = parsed.get('normalizedRequest') or None - if parsed.get('intent'): - intentText = parsed.get('intent') - contextItems = parsed.get('contextItems') or [] + setattr(self.services, 'currentUserLanguage', detectedLanguage) except Exception: - contextItems = [] + pass + self.services.currentUserPrompt = intentText or userInput.prompt + try: + if normalizedRequest: + setattr(self.services, 'currentUserPromptNormalized', normalizedRequest) + if contextItems is not None: + setattr(self.services, 'currentUserContextItems', contextItems) + except Exception: + pass - # Update services state - if detectedLanguage and isinstance(detectedLanguage, str): - self._setUserLanguage(detectedLanguage) + # Telemetry (sizes and counts) + try: + inputSize = len(userInput.prompt.encode('utf-8')) if userInput and userInput.prompt else 0 + outputSize = len(aiResponse.encode('utf-8')) if aiResponse else 0 + self.services.workflow.storeLog(workflow, { + "message": f"User prompt analyzed (input {inputSize} bytes, output {outputSize} bytes, items {len(contextItems)})", + "type": "info", + "status": "running", + "progress": 0 + }) + except Exception: + pass + + # Create documents for context items (in-memory ChatDocument; persistence via storeMessageWithDocuments) + if contextItems and isinstance(contextItems, list): + for idx, item in enumerate(contextItems): try: - setattr(self.services, 'currentUserLanguage', detectedLanguage) - except Exception: - pass - self.services.currentUserPrompt = intentText or userInput.prompt - try: - if normalizedRequest: - setattr(self.services, 'currentUserPromptNormalized', normalizedRequest) - if contextItems is not None: - setattr(self.services, 'currentUserContextItems', contextItems) - except Exception: - pass - - # Telemetry (sizes and counts) - try: - inputSize = len(userInput.prompt.encode('utf-8')) if userInput and userInput.prompt else 0 - outputSize = len(aiResponse.encode('utf-8')) if aiResponse else 0 - self.services.workflow.createLog({ - "workflowId": workflow.id, - "message": f"User prompt analyzed (input {inputSize} bytes, output {outputSize} bytes, items {len(contextItems)})", - "type": "info", - "status": "running", - "progress": 0 - }) - except Exception: - pass - - # Create and attach documents for context items - if contextItems and isinstance(contextItems, list): - created_docs = [] - for idx, item in enumerate(contextItems): - try: - title = item.get('title') if isinstance(item, dict) else None - mime = item.get('mimeType') if isinstance(item, dict) else None - content = item.get('content') if isinstance(item, dict) else None - if not content: - continue - fileName = (title or f"user_context_{idx+1}.txt").strip() - mimeType = (mime or "text/plain").strip() - - # Create file in component storage - content_bytes = content.encode('utf-8') - file_item = self.services.interfaceDbComponent.createFile( - name=fileName, - mimeType=mimeType, - content=content_bytes - ) - # Persist file data - self.services.interfaceDbComponent.createFileData(file_item.id, content_bytes) - - # Collect file info - file_info = self.services.workflow.getFileInfo(file_item.id) - from modules.datamodels.datamodelChat import ChatDocument as _ChatDocument - doc = _ChatDocument( - messageId=message.id, - fileId=file_item.id, - fileName=file_info.get("fileName", fileName) if file_info else fileName, - fileSize=file_info.get("size", len(content_bytes)) if file_info else len(content_bytes), - mimeType=file_info.get("mimeType", mimeType) if file_info else mimeType - ) - # Persist document record - self.services.interfaceDbChat.createDocument(doc.to_dict()) - created_docs.append(doc) - except Exception: + title = item.get('title') if isinstance(item, dict) else None + mime = item.get('mimeType') if isinstance(item, dict) else None + content = item.get('content') if isinstance(item, dict) else None + if not content: continue + fileName = (title or f"user_context_{idx+1}.txt").strip() + mimeType = (mime or "text/plain").strip() - if created_docs: - # Attach to message and persist - if not message.documents: - message.documents = [] - message.documents.extend(created_docs) - self.services.workflow.updateMessage(message.id, { - "documents": [d.to_dict() for d in message.documents], - "documentsLabel": context_label - }) + # Create file in component storage + content_bytes = content.encode('utf-8') + file_item = self.services.interfaceDbComponent.createFile( + name=fileName, + mimeType=mimeType, + content=content_bytes + ) + # Persist file data + self.services.interfaceDbComponent.createFileData(file_item.id, content_bytes) + + # Collect file info + file_info = self.services.workflow.getFileInfo(file_item.id) + from modules.datamodels.datamodelChat import ChatDocument + doc = ChatDocument( + fileId=file_item.id, + fileName=file_info.get("fileName", fileName) if file_info else fileName, + fileSize=file_info.get("size", len(content_bytes)) if file_info else len(content_bytes), + mimeType=file_info.get("mimeType", mimeType) if file_info else mimeType + ) + created_docs.append(doc) + except Exception: + continue + except Exception as e: + logger.warning(f"Prompt analysis failed or skipped: {str(e)}") + + # Process user-uploaded documents (fileIds) and combine with context documents + if userInput.listFileId: + try: + user_docs = await self._processFileIds(userInput.listFileId, None) + if user_docs: + created_docs.extend(user_docs) except Exception as e: - logger.warning(f"Prompt analysis failed or skipped: {str(e)}") - - return message - else: - raise Exception("Failed to create first message") + logger.warning(f"Failed to process user fileIds: {e}") + + # Finally, persist and bind the first message with combined documents (context + user) + created_message = self.services.workflow.storeMessageWithDocuments(workflow, messageData, created_docs) + return created_message except Exception as e: logger.error(f"Error sending first message: {str(e)}") @@ -444,9 +420,7 @@ class WorkflowManager: "taskProgress": "stopped", "actionProgress": "stopped" } - message = self.services.workflow.createMessage(stopped_message) - if message: - workflow.messages.append(message) + self.services.workflow.storeMessageWithDocuments(workflow, stopped_message, []) # Update workflow status to stopped workflow.status = "stopped" @@ -476,9 +450,7 @@ class WorkflowManager: "taskProgress": "stopped", "actionProgress": "stopped" } - message = self.services.workflow.createMessage(stopped_message) - if message: - workflow.messages.append(message) + self.services.workflow.storeMessageWithDocuments(workflow, stopped_message, []) # Update workflow status to stopped workflow.status = "stopped" @@ -491,8 +463,7 @@ class WorkflowManager: }) # Add stopped log entry - self.services.workflow.createLog({ - "workflowId": workflow.id, + self.services.workflow.storeLog(workflow, { "message": "Workflow stopped by user", "type": "warning", "status": "stopped", @@ -518,9 +489,7 @@ class WorkflowManager: "taskProgress": "fail", "actionProgress": "fail" } - message = self.services.workflow.createMessage(error_message) - if message: - workflow.messages.append(message) + self.services.workflow.storeMessageWithDocuments(workflow, error_message, []) # Update workflow status to failed workflow.status = "failed" @@ -533,8 +502,7 @@ class WorkflowManager: }) # Add failed log entry - self.services.workflow.createLog({ - "workflowId": workflow.id, + self.services.workflow.storeLog(workflow, { "message": "Workflow failed: Unknown error", "type": "error", "status": "failed", @@ -565,9 +533,7 @@ class WorkflowManager: "taskProgress": "fail", "actionProgress": "fail" } - message = self.services.workflow.createMessage(error_message) - if message: - workflow.messages.append(message) + self.services.workflow.storeMessageWithDocuments(workflow, error_message, []) # Update workflow status to failed workflow.status = "failed" @@ -610,9 +576,9 @@ class WorkflowManager: } # Create message using interface - message = self.services.workflow.createMessage(messageData) + message = self.services.workflow.storeMessageWithDocuments(workflow, messageData, []) if message: - workflow.messages.append(message) + self.services.workflow.storeMessageWithDocuments(workflow, message.__dict__, getattr(message, 'documents', [])) # Update workflow status to completed workflow.status = "completed" @@ -625,8 +591,7 @@ class WorkflowManager: }) # Add completion log entry - self.services.workflow.createLog({ - "workflowId": workflow.id, + self.services.workflow.storeLog(workflow, { "message": "Workflow completed", "type": "success", "status": "completed", @@ -696,13 +661,10 @@ class WorkflowManager: "taskProgress": "pending", "actionProgress": "pending" } - message = self.services.workflow.createMessage(stopped_message) - if message: - workflow.messages.append(message) + self.services.workflow.storeMessageWithDocuments(workflow, stopped_message, []) # Add log entry - self.services.workflow.createLog({ - "workflowId": workflow.id, + self.services.workflow.storeLog(workflow, { "message": "Workflow stopped by user", "type": "warning", "status": "stopped", @@ -741,13 +703,10 @@ class WorkflowManager: "taskProgress": "fail", "actionProgress": "fail" } - message = self.services.workflow.createMessage(error_message) - if message: - workflow.messages.append(message) + self.services.workflow.storeMessageWithDocuments(workflow, error_message, []) # Add error log entry - self.services.workflow.createLog({ - "workflowId": workflow.id, + self.services.workflow.storeLog(workflow, { "message": f"Workflow failed: {str(error)}", "type": "error", "status": "failed",