From 711b8bc50d7aa40b911c042d8aed3e2e0b9b60c2 Mon Sep 17 00:00:00 2001 From: ValueOn AG Date: Mon, 20 Oct 2025 21:53:31 +0200 Subject: [PATCH] cleaned unused and strreamlines progress ui logging --- modules/services/serviceAi/subCoreAi.py | 6 +- .../serviceAi/subDocumentGeneration.py | 13 +- .../mainServiceGeneration.py | 7 +- .../serviceGeneration/subPromptBuilder.py | 212 ++++++------------ .../serviceWorkflow/mainServiceWorkflow.py | 17 +- modules/shared/progressLogger.py | 4 +- modules/workflows/methods/methodAi.py | 23 +- .../processing/adaptive/progressTracker.py | 2 +- .../processing/core/actionExecutor.py | 128 ----------- .../workflows/processing/core/taskPlanner.py | 5 +- .../processing/modes/modeActionplan.py | 84 ------- .../workflows/processing/modes/modeReact.py | 153 +------------ .../workflows/processing/workflowProcessor.py | 97 +------- 13 files changed, 121 insertions(+), 630 deletions(-) diff --git a/modules/services/serviceAi/subCoreAi.py b/modules/services/serviceAi/subCoreAi.py index d581214b..6e8555a6 100644 --- a/modules/services/serviceAi/subCoreAi.py +++ b/modules/services/serviceAi/subCoreAi.py @@ -8,7 +8,11 @@ logger = logging.getLogger(__name__) # Loop instruction texts for different formats LoopInstructionTexts = { - "json": """CRITICAL: You MUST set the "continuation" field in your JSON response. If you cannot complete the full response, deliver the possible part and set "continuation" to a brief description of what still needs to be generated. If you can complete the full response, set "continuation" to null.""", + "json": """ +CRITICAL: +- If content is too long: deliver a valid partial JSON and set "continuation" to briefly describe the remaining content +- If content fits: deliver complete result and set \"continuation\": null +""", # Add more formats here as needed # "xml": "...", # "text": "...", diff --git a/modules/services/serviceAi/subDocumentGeneration.py b/modules/services/serviceAi/subDocumentGeneration.py index 9af20b1e..d40f2439 100644 --- a/modules/services/serviceAi/subDocumentGeneration.py +++ b/modules/services/serviceAi/subDocumentGeneration.py @@ -87,14 +87,13 @@ class SubDocumentGeneration: Always processes as multi-file structure internally. """ - # Create progress logger + # Init progress logger workflow = self.services.currentWorkflow - progressLogger = self.services.workflow.createProgressLogger(workflow) operationId = f"docGenUnified_{workflow.id}_{int(time.time())}" try: # Start progress tracking - progressLogger.startOperation( + self.services.workflow.progressLogStart( operationId, "Generate", "Unified Document Generation", @@ -102,7 +101,7 @@ class SubDocumentGeneration: ) # Update progress - generating extraction prompt - progressLogger.updateProgress(operationId, 0.1, "Generating prompt") + self.services.workflow.progressLogUpdate(operationId, 0.1, "Generating prompt") # Write prompt to debug file self.services.utils.writeDebugFile(extractionPrompt, "extraction_prompt", documents) @@ -113,7 +112,7 @@ class SubDocumentGeneration: ) # Update progress - AI processing completed - progressLogger.updateProgress(operationId, 0.6, "Processing done") + self.services.workflow.progressLogUpdate(operationId, 0.6, "Processing done") @@ -132,13 +131,13 @@ class SubDocumentGeneration: logger.warning("Failed to emit raw extraction chat message (unified)") # Complete progress tracking - progressLogger.completeOperation(operationId, True) + self.services.workflow.progressLogFinish(operationId, True) return aiResponse except Exception as e: logger.error(f"Error in unified document processing: {str(e)}") - progressLogger.completeOperation(operationId, False) + self.services.workflow.progressLogFinish(operationId, False) raise def _validateUnifiedResponseStructure(self, response: Dict[str, Any]) -> bool: diff --git a/modules/services/serviceGeneration/mainServiceGeneration.py b/modules/services/serviceGeneration/mainServiceGeneration.py index f2dc89a0..cbb5cb0f 100644 --- a/modules/services/serviceGeneration/mainServiceGeneration.py +++ b/modules/services/serviceGeneration/mainServiceGeneration.py @@ -359,17 +359,14 @@ class GenerationService: self, outputFormat: str, userPrompt: str, - title: str, - aiService=None + title: str ) -> str: """Get generation prompt for enhancing extracted JSON content.""" from .subPromptBuilder import buildGenerationPrompt return await buildGenerationPrompt( outputFormat=outputFormat, userPrompt=userPrompt, - title=title, - aiService=aiService, - services=self.services + title=title ) diff --git a/modules/services/serviceGeneration/subPromptBuilder.py b/modules/services/serviceGeneration/subPromptBuilder.py index c59909e2..37b97917 100644 --- a/modules/services/serviceGeneration/subPromptBuilder.py +++ b/modules/services/serviceGeneration/subPromptBuilder.py @@ -146,55 +146,9 @@ Extract the ACTUAL CONTENT from the source documents. Do not use placeholder tex async def buildGenerationPrompt( outputFormat: str, userPrompt: str, - title: str, - aiService=None, - services=None + title: str ) -> str: - """Build generic extraction prompt that works for both single and multi-file.""" - - # Use AI to determine the best approach - if aiService: - try: - analysis_prompt = f""" -Analyze this user request and determine the best JSON structure for document extraction. - -User request: "{userPrompt}" - -Respond with JSON only: -{{ - "requires_multi_file": true/false, - "recommended_schema": "single_document|multi_document", - "split_approach": "description of how to organize content", - "file_naming": "suggested naming pattern" -}} - -Consider the user's intent and the most logical way to organize the extracted content. -""" - - from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, OperationType - request_options = AiCallOptions() - request_options.operationType = OperationType.GENERAL - - request = AiCallRequest(prompt=analysis_prompt, context="", options=request_options) - response = await aiService.aiObjects.call(request) - - if response and response.content: - import re - - result = response.content.strip() - json_match = re.search(r'\{.*\}', result, re.DOTALL) - if json_match: - result = json_match.group(0) - - analysis = json.loads(result) - - # Use analysis to build appropriate prompt - return await buildAdaptiveExtractionPrompt( - outputFormat, userPrompt, title, analysis, aiService, services - ) - except Exception as e: - services.utils.debugLogToFile(f"Generic prompt analysis failed: {str(e)}", "PROMPT_BUILDER") - + """Build the unified generation prompt using a single JSON template.""" # Always use the proper generation prompt template with LOOP_INSTRUCTION result = f"""Generate structured JSON content for document creation. @@ -202,7 +156,7 @@ USER REQUEST: "{userPrompt}" DOCUMENT TITLE: "{title}" TARGET FORMAT: {outputFormat} -Return ONLY this JSON structure: +Return ONLY valid JSON matching this structure (template below). Do not include any prose before/after. Use this as the single template reference for your output: {{ "metadata": {{ "title": "{title}", @@ -236,6 +190,47 @@ Return ONLY this JSON structure: }} ], "order": 2 + }}, + {{ + "id": "section_3", + "content_type": "table", + "elements": [ + {{ + "headers": ["Column 1", "Column 2", "Column 3"], + "rows": [ + ["R1C1", "R1C2", "R1C3"], + ["R2C1", "R2C2", "R2C3"] + ], + "caption": "Example table" + }} + ], + "order": 3 + }}, + {{ + "id": "section_4", + "content_type": "list", + "elements": [ + {{ + "items": [ + {{ "text": "First item" }}, + {{ "text": "Second item", "subitems": [{{ "text": "Second.1" }}] }}, + {{ "text": "Third item" }} + ], + "list_type": "bullet" + }} + ], + "order": 4 + }}, + {{ + "id": "section_5", + "content_type": "code", + "elements": [ + {{ + "code": "print('Hello World')", + "language": "python" + }} + ], + "order": 5 }} ] }} @@ -244,19 +239,13 @@ Return ONLY this JSON structure: }} RULES: +- Follow the template structure above exactly; emit only one JSON object in the response - Fill sections with content based on the user request - Use appropriate content_type: "heading", "paragraph", "table", "list" -- If content is too long: deliver partial result and set "continuation": "description of remaining content" -- If content fits: deliver complete result and set "continuation": null -- Split large content into multiple sections if needed LOOP_INSTRUCTION """ - # Debug output - if services: - services.utils.debugLogToFile(f"GENERATION PROMPT: Generated successfully", "PROMPT_BUILDER") - return result.strip() async def buildExtractionPrompt( @@ -391,102 +380,29 @@ DO NOT return a schema description - return actual extracted content in the JSON # Combine all parts finalPrompt = f"{genericIntro}\n\n{formatGuidelines}".strip() - # Save extraction prompt to debug file - services.utils.writeDebugFile(finalPrompt, "extraction_prompt") + # Save extraction prompt to debug file - only if debug enabled + try: + debug_enabled = services.utils.configGet("APP_DEBUG_CHAT_WORKFLOW_ENABLED", False) + if debug_enabled: + import os + from datetime import datetime, UTC + ts = datetime.now(UTC).strftime("%Y%m%d-%H%M%S") + # Use configured log directory instead of hardcoded test-chat + from modules.shared.configuration import APP_CONFIG + logDir = APP_CONFIG.get("APP_LOGGING_LOG_DIR", "./") + if not os.path.isabs(logDir): + gatewayDir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + logDir = os.path.join(gatewayDir, logDir) + debug_root = os.path.join(logDir, 'debug') + os.makedirs(debug_root, exist_ok=True) + with open(os.path.join(debug_root, f"{ts}_extraction_prompt.txt"), "w", encoding="utf-8") as f: + f.write(finalPrompt) + except Exception: + pass return finalPrompt -async def buildGenerationPrompt( - outputFormat: str, - userPrompt: str, - title: str, - aiService=None, - services=None -) -> str: - """ - Use AI to build the generation prompt based on user intent and format requirements. - Focus on what's important for the user and how to structure the content. - """ - if not aiService: - # Fallback if no AI service available - return f"Generate a comprehensive {outputFormat} document titled '{title}' based on the extracted content." - - try: - # Protect userPrompt from injection - safeUserPrompt = userPrompt.replace('"', '\\"').replace("'", "\\'").replace('\n', ' ').replace('\r', ' ') - - # Debug output - services.utils.debugLogToFile(f"GENERATION PROMPT REQUEST: buildGenerationPrompt called with outputFormat='{outputFormat}', title='{title}'", "PROMPT_BUILDER") - - # Return static generation prompt template instead of calling AI - services.utils.debugLogToFile("GENERATION PROMPT REQUEST: Using static template instead of AI call", "PROMPT_BUILDER") - - # Return static generation prompt template - result = f"""Generate structured JSON content for document creation. - -USER REQUEST: "{safeUserPrompt}" -DOCUMENT TITLE: "{title}" -TARGET FORMAT: {outputFormat} - -Return ONLY this JSON structure: -{{ - "metadata": {{ - "title": "{title}", - "splitStrategy": "single_document", - "source_documents": [], - "extraction_method": "ai_generation" - }}, - "documents": [ - {{ - "id": "doc_1", - "title": "{title}", - "filename": "document.{outputFormat}", - "sections": [ - {{ - "id": "section_1", - "content_type": "heading", - "elements": [ - {{ - "level": 1, - "text": "1. SECTION TITLE" - }} - ], - "order": 1 - }}, - {{ - "id": "section_2", - "content_type": "paragraph", - "elements": [ - {{ - "text": "This is the actual content that should be generated." - }} - ], - "order": 2 - }} - ] - }} - ], - "continuation": null -}} - -RULES: -- Fill sections with content based on the user request -- Use appropriate content_type: "heading", "paragraph", "table", "list" -- Split large content into multiple sections if needed - -LOOP_INSTRUCTION -""" - - # Debug output - services.utils.debugLogToFile(f"GENERATION PROMPT: Generated successfully", "PROMPT_BUILDER") - - return result.strip() - - except Exception as e: - # Fallback on any error - preserve user prompt for language instructions - services.utils.debugLogToFile(f"DEBUG: AI generation prompt failed: {str(e)}", "PROMPT_BUILDER") - return f"Generate a comprehensive {outputFormat} document titled '{title}' based on the extracted content. User requirements: {userPrompt}" async def _parseExtractionIntent(userPrompt: str, outputFormat: str, aiService=None, services=None) -> str: diff --git a/modules/services/serviceWorkflow/mainServiceWorkflow.py b/modules/services/serviceWorkflow/mainServiceWorkflow.py index 54ddc48f..5eae45c2 100644 --- a/modules/services/serviceWorkflow/mainServiceWorkflow.py +++ b/modules/services/serviceWorkflow/mainServiceWorkflow.py @@ -918,4 +918,19 @@ Please provide a comprehensive summary of this conversation.""" return [] def createProgressLogger(self, workflow) -> ProgressLogger: - return ProgressLogger(self, workflow) \ No newline at end of file + return ProgressLogger(self, workflow) + + def progressLogStart(self, operationId: str, serviceName: str, actionName: str, context: str = ""): + """Wrapper for ProgressLogger.startOperation""" + progressLogger = self.createProgressLogger(self.workflow) + return progressLogger.startOperation(operationId, serviceName, actionName, context) + + def progressLogUpdate(self, operationId: str, progress: float, statusUpdate: str = ""): + """Wrapper for ProgressLogger.updateOperation""" + progressLogger = self.createProgressLogger(self.workflow) + return progressLogger.updateOperation(operationId, progress, statusUpdate) + + def progressLogFinish(self, operationId: str, success: bool = True): + """Wrapper for ProgressLogger.finishOperation""" + progressLogger = self.createProgressLogger(self.workflow) + return progressLogger.finishOperation(operationId, success) \ No newline at end of file diff --git a/modules/shared/progressLogger.py b/modules/shared/progressLogger.py index daa573c6..09a66bab 100644 --- a/modules/shared/progressLogger.py +++ b/modules/shared/progressLogger.py @@ -42,7 +42,7 @@ class ProgressLogger: self._logProgress(operationId, 0.0, f"Starting {actionName}") logger.debug(f"Started operation {operationId}: {serviceName} - {actionName}") - def updateProgress(self, operationId: str, progress: float, statusUpdate: str = ""): + def updateOperation(self, operationId: str, progress: float, statusUpdate: str = ""): """Update progress for an operation. Args: @@ -59,7 +59,7 @@ class ProgressLogger: self._logProgress(operationId, progress, context) logger.debug(f"Updated operation {operationId}: {progress:.2f} - {context}") - def completeOperation(self, operationId: str, success: bool = True): + def finishOperation(self, operationId: str, success: bool = True): """Complete an operation. Args: diff --git a/modules/workflows/methods/methodAi.py b/modules/workflows/methods/methodAi.py index b2c7e526..738a4f36 100644 --- a/modules/workflows/methods/methodAi.py +++ b/modules/workflows/methods/methodAi.py @@ -3,6 +3,7 @@ AI processing method module. Handles direct AI calls for any type of task. """ +import time import logging from typing import Dict, Any, List, Optional from datetime import datetime, UTC @@ -48,13 +49,11 @@ class MethodAi(MethodBase): - requiredTags (list, optional): Capability tags (e.g., text, chat, reasoning, analysis, image, vision, web, search). """ try: - # Create progress logger - import time - progressLogger = self.services.workflow.createProgressLogger(self.services.currentWorkflow) + # Init progress logger operationId = f"ai_process_{self.services.currentWorkflow.id}_{int(time.time())}" # Start progress tracking - progressLogger.startOperation( + self.services.workflow.progressLogStart( operationId, "Generate", "AI Processing", @@ -70,7 +69,7 @@ class MethodAi(MethodBase): logger.info(f"aiPrompt extracted: '{aiPrompt}' (type: {type(aiPrompt)})") # Update progress - preparing parameters - progressLogger.updateProgress(operationId, 0.2, "Preparing parameters") + self.services.workflow.progressLogUpdate(operationId, 0.2, "Preparing parameters") documentList = parameters.get("documentList", []) if isinstance(documentList, str): @@ -97,7 +96,7 @@ class MethodAi(MethodBase): logger.info(f"Using result type: {resultType} -> {output_extension}") # Update progress - preparing documents - progressLogger.updateProgress(operationId, 0.3, "Preparing documents") + self.services.workflow.progressLogUpdate(operationId, 0.3, "Preparing documents") # Get ChatDocuments for AI service - let AI service handle all document processing chatDocuments = [] @@ -107,7 +106,7 @@ class MethodAi(MethodBase): logger.info(f"Prepared {len(chatDocuments)} documents for AI processing") # Update progress - preparing AI call - progressLogger.updateProgress(operationId, 0.4, "Preparing AI call") + self.services.workflow.progressLogUpdate(operationId, 0.4, "Preparing AI call") # Build options and delegate document handling to AI/Extraction/Generation services output_format = output_extension.replace('.', '') or 'txt' @@ -125,7 +124,7 @@ class MethodAi(MethodBase): ) # Update progress - calling AI - progressLogger.updateProgress(operationId, 0.6, "Calling AI") + self.services.workflow.progressLogUpdate(operationId, 0.6, "Calling AI") result = await self.services.ai.callAiDocuments( prompt=aiPrompt, # Use original prompt, let unified generation handle prompt building @@ -135,7 +134,7 @@ class MethodAi(MethodBase): ) # Update progress - processing result - progressLogger.updateProgress(operationId, 0.8, "Processing result") + self.services.workflow.progressLogUpdate(operationId, 0.8, "Processing result") from modules.datamodels.datamodelChat import ActionDocument @@ -149,7 +148,7 @@ class MethodAi(MethodBase): )) # Complete progress tracking - progressLogger.completeOperation(operationId, True) + self.services.workflow.progressLogFinish(operationId, True) return ActionResult.isSuccess(documents=action_documents) @@ -166,7 +165,7 @@ class MethodAi(MethodBase): ) # Complete progress tracking - progressLogger.completeOperation(operationId, True) + self.services.workflow.progressLogFinish(operationId, True) return ActionResult.isSuccess(documents=[action_document]) @@ -175,7 +174,7 @@ class MethodAi(MethodBase): # Complete progress tracking with failure try: - progressLogger.completeOperation(operationId, False) + self.services.workflow.progressLogFinish(operationId, False) except: pass # Don't fail on progress logging errors diff --git a/modules/workflows/processing/adaptive/progressTracker.py b/modules/workflows/processing/adaptive/progressTracker.py index b5a41533..69444e7f 100644 --- a/modules/workflows/processing/adaptive/progressTracker.py +++ b/modules/workflows/processing/adaptive/progressTracker.py @@ -17,7 +17,7 @@ class ProgressTracker: self.learningInsights = [] self.currentPhase = "planning" - def updateProgress(self, result: Any, validation: Dict[str, Any], intent: Dict[str, Any]): + def updateOperation(self, result: Any, validation: Dict[str, Any], intent: Dict[str, Any]): """Updates progress tracking based on action result""" try: schemaCompliant = validation.get('schemaCompliant', True) diff --git a/modules/workflows/processing/core/actionExecutor.py b/modules/workflows/processing/core/actionExecutor.py index 19d08164..3dbe0d35 100644 --- a/modules/workflows/processing/core/actionExecutor.py +++ b/modules/workflows/processing/core/actionExecutor.py @@ -131,7 +131,6 @@ class ActionExecutor: docMetadata = {k: v for k, v in docMetadata.items() if v != 'Unknown'} actionResultTrace["documents"].append(docMetadata) - self._writeTraceLog("Action Result", actionResultTrace) # Process action result if result.success: @@ -226,130 +225,3 @@ class ActionExecutor: except Exception as e: logger.error(f"Error creating action completion message: {str(e)}") - def _writeTraceLog(self, contextText: str, data: Any) -> None: - """Write trace data and categorized React debug files when in DEBUG level""" - try: - import os - import json - from datetime import datetime, UTC - - # Only write if logger is in debug mode - if logger.level > logging.DEBUG: - return - - # Get log directory from configuration - logDir = self.services.utils.configGet("APP_LOGGING_LOG_DIR", "./") - if not os.path.isabs(logDir): - # If relative path, make it relative to the gateway directory - gatewayDir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) - logDir = os.path.join(gatewayDir, logDir) - - # Ensure log directory exists - os.makedirs(logDir, exist_ok=True) - - # Create trace file path (aggregate) - traceFile = os.path.join(logDir, "log_trace.log") - - # Derive a React-category filename based on context - def _reactFileForContext(text: str) -> str: - t = (text or "").lower() - if "action result" in t: - return "react_action_results.jsonl" - if "extraction" in t: - if "prompt" in t: - return "react_extraction_prompts.jsonl" - if "response" in t: - return "react_extraction_responses.jsonl" - if "generation" in t: - if "prompt" in t: - return "react_generation_prompts.jsonl" - if "response" in t: - return "react_generation_responses.jsonl" - if "render" in t: - if "prompt" in t: - return "react_rendering_prompts.jsonl" - if "response" in t: - return "react_rendering_responses.jsonl" - if "validation" in t: - if "prompt" in t: - return "react_validation_prompts.jsonl" - if "response" in t: - return "react_validation_responses.jsonl" - return "react_misc.jsonl" - - # Daily suffix for React files - dateSuffix = datetime.fromtimestamp(self.services.utils.timestampGetUtc(), UTC).strftime("%Y%m%d") - baseReactFile = _reactFileForContext(contextText) - name, ext = os.path.splitext(baseReactFile) - reactFile = os.path.join(logDir, f"{name}_{dateSuffix}{ext}") - - # Format the trace entry with better structure - timestamp = datetime.fromtimestamp(self.services.utils.timestampGetUtc(), UTC).strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] - - # Create a structured trace entry - traceEntry = f"[{timestamp}] {contextText}\n" - traceEntry += "=" * 80 + "\n" - - # Add data if provided with improved formatting - if data is not None: - try: - if isinstance(data, (dict, list)): - # Format as pretty JSON with better settings - jsonStr = json.dumps(data, indent=2, default=str, ensure_ascii=False, sort_keys=False) - traceEntry += f"JSON Data:\n{jsonStr}\n" - elif isinstance(data, str): - # For string data, try to parse as JSON first, then fall back to plain text - try: - parsed = json.loads(data) - jsonStr = json.dumps(parsed, indent=2, default=str, ensure_ascii=False, sort_keys=False) - traceEntry += f"JSON Data (parsed from string):\n{jsonStr}\n" - except (json.JSONDecodeError, TypeError): - # Not valid JSON, show as plain text with proper line breaks - formatted_data = data.replace('\\n', '\n') - traceEntry += f"Text Data:\n{formatted_data}\n" - else: - # For other types, convert to string and try to parse as JSON - dataStr = str(data) - try: - parsed = json.loads(dataStr) - jsonStr = json.dumps(parsed, indent=2, default=str, ensure_ascii=False, sort_keys=False) - traceEntry += f"JSON Data (parsed from object):\n{jsonStr}\n" - except (json.JSONDecodeError, TypeError): - # Not valid JSON, show as plain text with proper line breaks - formatted_data = dataStr.replace('\\n', '\n') - traceEntry += f"Object Data:\n{formatted_data}\n" - except Exception as e: - # Fallback to simple string representation - traceEntry += f"Data (fallback): {str(data)}\n" - else: - traceEntry += "No data provided\n" - - traceEntry += "=" * 80 + "\n\n" - - # Write to trace file - with open(traceFile, "a", encoding="utf-8") as f: - f.write(traceEntry) - - # Also write a compact JSONL record to the categorized React file - try: - workflowId = getattr(getattr(self, 'services', None), 'currentWorkflow', None) - if hasattr(workflowId, 'id'): - workflowId = workflowId.id - # We have action context within executor only sometimes; include when accessible - reactRecord = { - "timestamp": timestamp, - "context": contextText, - "workflowId": workflowId, - "round": getattr(getattr(self, 'workflow', None), 'currentRound', None) if hasattr(self, 'workflow') else None, - "task": getattr(getattr(self, 'workflow', None), 'currentTask', None) if hasattr(self, 'workflow') else None, - "action": getattr(getattr(self, 'workflow', None), 'currentAction', None) if hasattr(self, 'workflow') else None, - "data": data - } - with open(reactFile, "a", encoding="utf-8") as rf: - rf.write(json.dumps(reactRecord, ensure_ascii=False, default=str) + "\n") - except Exception: - pass - - except Exception as e: - # Don't log trace errors to avoid recursion - pass diff --git a/modules/workflows/processing/core/taskPlanner.py b/modules/workflows/processing/core/taskPlanner.py index 2e62daa1..41f7f851 100644 --- a/modules/workflows/processing/core/taskPlanner.py +++ b/modules/workflows/processing/core/taskPlanner.py @@ -235,7 +235,4 @@ class TaskPlanner: except Exception as e: logger.error(f"Error validating task plan: {str(e)}") return False - - def _writeTraceLog(self, contextText: str, data: Any) -> None: - """Disabled extra trace file outputs (per chat debug simplification).""" - return + \ No newline at end of file diff --git a/modules/workflows/processing/modes/modeActionplan.py b/modules/workflows/processing/modes/modeActionplan.py index 51d488a5..bbea997d 100644 --- a/modules/workflows/processing/modes/modeActionplan.py +++ b/modules/workflows/processing/modes/modeActionplan.py @@ -122,9 +122,6 @@ class ActionplanMode(BaseMode): actionPromptTemplate = bundle.prompt placeholders = bundle.placeholders - # Trace action planning prompt - self._writeTraceLog("Action Plan Prompt", actionPromptTemplate) - self._writeTraceLog("Action Plan Placeholders", placeholders) # Centralized AI call: Action planning (quality, detailed) with placeholders options = AiCallOptions( @@ -146,8 +143,6 @@ class ActionplanMode(BaseMode): # Log action response received logger.info("=== ACTION PLAN AI RESPONSE RECEIVED ===") logger.info(f"Response length: {len(prompt) if prompt else 0}") - # Trace action planning response - self._writeTraceLog("Action Plan Response", prompt) # Parse action response jsonStart = prompt.find('{') @@ -461,9 +456,6 @@ class ActionplanMode(BaseMode): logger.info(f"Task: {taskStep.objective}") logger.info(f"Action Results Count: {len(reviewContext.action_results) if reviewContext.action_results else 0}") logger.info(f"Task Actions Count: {len(reviewContext.task_actions) if reviewContext.task_actions else 0}") - # Trace result review prompt - self._writeTraceLog("Result Review Prompt", promptTemplate) - self._writeTraceLog("Result Review Placeholders", placeholders) # Centralized AI call: Result validation (balanced analysis) with placeholders options = AiCallOptions( @@ -481,8 +473,6 @@ class ActionplanMode(BaseMode): # Log result review response received logger.info("=== RESULT REVIEW AI RESPONSE RECEIVED ===") logger.info(f"Response length: {len(response) if response else 0}") - # Trace result review response - self._writeTraceLog("Result Review Response", response) # Parse review response jsonStart = response.find('{') @@ -755,77 +745,3 @@ class ActionplanMode(BaseMode): logger.error(f"Error creating task action: {str(e)}") return None - def _writeTraceLog(self, contextText: str, data: Any) -> None: - """Write trace data to configured trace file if in debug mode with improved JSON formatting""" - try: - import os - import json - from datetime import datetime, UTC - - # Only write if logger is in debug mode - if logger.level > logging.DEBUG: - return - - # Get log directory from configuration - logDir = self.services.utils.configGet("APP_LOGGING_LOG_DIR", "./") - if not os.path.isabs(logDir): - # If relative path, make it relative to the gateway directory - gatewayDir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) - logDir = os.path.join(gatewayDir, logDir) - - # Ensure log directory exists - os.makedirs(logDir, exist_ok=True) - - # Create trace file path - traceFile = os.path.join(logDir, "log_trace.log") - - # Format the trace entry with better structure - timestamp = datetime.fromtimestamp(self.services.utils.timestampGetUtc(), UTC).strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] - - # Create a structured trace entry - traceEntry = f"[{timestamp}] {contextText}\n" - traceEntry += "=" * 80 + "\n" - - # Add data if provided with improved formatting - if data is not None: - try: - if isinstance(data, (dict, list)): - # Format as pretty JSON with better settings - jsonStr = json.dumps(data, indent=2, default=str, ensure_ascii=False, sort_keys=False) - traceEntry += f"JSON Data:\n{jsonStr}\n" - elif isinstance(data, str): - # For string data, try to parse as JSON first, then fall back to plain text - try: - parsed = json.loads(data) - jsonStr = json.dumps(parsed, indent=2, default=str, ensure_ascii=False, sort_keys=False) - traceEntry += f"JSON Data (parsed from string):\n{jsonStr}\n" - except (json.JSONDecodeError, TypeError): - # Not valid JSON, show as plain text with proper line breaks - formatted_data = data.replace('\\n', '\n') - traceEntry += f"Text Data:\n{formatted_data}\n" - else: - # For other types, convert to string and try to parse as JSON - dataStr = str(data) - try: - parsed = json.loads(dataStr) - jsonStr = json.dumps(parsed, indent=2, default=str, ensure_ascii=False, sort_keys=False) - traceEntry += f"JSON Data (parsed from object):\n{jsonStr}\n" - except (json.JSONDecodeError, TypeError): - # Not valid JSON, show as plain text with proper line breaks - formatted_data = dataStr.replace('\\n', '\n') - traceEntry += f"Object Data:\n{formatted_data}\n" - except Exception as e: - # Fallback to simple string representation - traceEntry += f"Data (fallback): {str(data)}\n" - else: - traceEntry += "No data provided\n" - - traceEntry += "=" * 80 + "\n\n" - - # Write to trace file - with open(traceFile, "a", encoding="utf-8") as f: - f.write(traceEntry) - - except Exception as e: - # Don't log trace errors to avoid recursion - pass diff --git a/modules/workflows/processing/modes/modeReact.py b/modules/workflows/processing/modes/modeReact.py index 2918ebba..9aea47ea 100644 --- a/modules/workflows/processing/modes/modeReact.py +++ b/modules/workflows/processing/modes/modeReact.py @@ -130,7 +130,7 @@ class ReactMode(BaseMode): self.learningEngine.learnFromFeedback(feedback, context, self.workflowIntent) # NEW: Update progress - self.progressTracker.updateProgress(result, validationResult, self.workflowIntent) + self.progressTracker.updateOperation(result, validationResult, self.workflowIntent) decision = await self._refineDecide(context, observation) @@ -922,155 +922,4 @@ Return only the user-friendly message, no technical details.""" logger.error(f"Error creating task action: {str(e)}") return None - def _writeTraceLog(self, contextText: str, data: Any) -> None: - """Write trace data and categorized React debug files when in DEBUG level""" - try: - import os - import json - from datetime import datetime, UTC - - # Only write if logger is in debug mode - if logger.level > logging.DEBUG: - return - - # Get log directory from configuration - logDir = self.services.utils.configGet("APP_LOGGING_LOG_DIR", "./") - if not os.path.isabs(logDir): - # If relative path, make it relative to the gateway directory - gatewayDir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) - logDir = os.path.join(gatewayDir, logDir) - - # Ensure log directory exists - os.makedirs(logDir, exist_ok=True) - - # Create trace file path (aggregate) - traceFile = os.path.join(logDir, "log_trace.log") - - # Derive a React-category filename based on context - def _reactFileForContext(text: str) -> str: - t = (text or "").lower() - if "task plan" in t: - if "prompt" in t: - return "react_taskplan_prompt.jsonl" - if "response" in t: - return "react_taskplan_response.jsonl" - if "plan selection" in t: - if "prompt" in t: - return "react_action_selection_prompt.jsonl" - if "response" in t: - return "react_action_selection_response.jsonl" - if "parameters" in t: - if "prompt" in t: - return "react_parameter_setting_prompt.jsonl" - if "response" in t: - return "react_parameter_setting_response.jsonl" - if "refinement" in t or "review" in t or "decide" in t: - # Treat refinement as validation/next-step decision context - if "prompt" in t: - return "react_validation_prompt.jsonl" - if "response" in t: - return "react_next_step_decisions.jsonl" - if "extraction" in t: - if "prompt" in t: - return "react_extraction_prompts.jsonl" - if "response" in t: - return "react_extraction_responses.jsonl" - if "generation" in t: - if "prompt" in t: - return "react_generation_prompts.jsonl" - if "response" in t: - return "react_generation_responses.jsonl" - if "render" in t: - if "prompt" in t: - return "react_rendering_prompts.jsonl" - if "response" in t: - return "react_rendering_responses.jsonl" - if "validation" in t: - if "prompt" in t: - return "react_validation_prompts.jsonl" - if "response" in t: - return "react_validation_responses.jsonl" - if "action result" in t: - return "react_action_results.jsonl" - # Fallback - return "react_misc.jsonl" - - # Daily suffix for React files - dateSuffix = datetime.fromtimestamp(self.services.utils.timestampGetUtc(), UTC).strftime("%Y%m%d") - baseReactFile = _reactFileForContext(contextText) - name, ext = os.path.splitext(baseReactFile) - reactFile = os.path.join(logDir, f"{name}_{dateSuffix}{ext}") - - # Format the trace entry with better structure - timestamp = datetime.fromtimestamp(self.services.utils.timestampGetUtc(), UTC).strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] - - # Create a structured trace entry - traceEntry = f"[{timestamp}] {contextText}\n" - traceEntry += "=" * 80 + "\n" - - # Add data if provided with improved formatting - if data is not None: - try: - if isinstance(data, (dict, list)): - # Format as pretty JSON with better settings - jsonStr = json.dumps(data, indent=2, default=str, ensure_ascii=False, sort_keys=False) - traceEntry += f"JSON Data:\n{jsonStr}\n" - elif isinstance(data, str): - # For string data, try to parse as JSON first, then fall back to plain text - try: - parsed = json.loads(data) - jsonStr = json.dumps(parsed, indent=2, default=str, ensure_ascii=False, sort_keys=False) - traceEntry += f"JSON Data (parsed from string):\n{jsonStr}\n" - except (json.JSONDecodeError, TypeError): - # Not valid JSON, show as plain text with proper line breaks - formatted_data = data.replace('\\n', '\n') - traceEntry += f"Text Data:\n{formatted_data}\n" - else: - # For other types, convert to string and try to parse as JSON - dataStr = str(data) - try: - parsed = json.loads(dataStr) - jsonStr = json.dumps(parsed, indent=2, default=str, ensure_ascii=False, sort_keys=False) - traceEntry += f"JSON Data (parsed from object):\n{jsonStr}\n" - except (json.JSONDecodeError, TypeError): - # Not valid JSON, show as plain text with proper line breaks - formatted_data = dataStr.replace('\\n', '\n') - traceEntry += f"Object Data:\n{formatted_data}\n" - except Exception as e: - # Fallback to simple string representation - traceEntry += f"Data (fallback): {str(data)}\n" - else: - traceEntry += "No data provided\n" - - traceEntry += "=" * 80 + "\n\n" - - # Write to trace file - with open(traceFile, "a", encoding="utf-8") as f: - f.write(traceEntry) - - # Also write a compact JSONL record to the categorized React file - try: - # Add workflow and step context if available - workflowId = getattr(getattr(self, 'workflow', None), 'id', None) - roundNumber = getattr(getattr(self, 'workflow', None), 'currentRound', None) - taskNumber = getattr(getattr(self, 'workflow', None), 'currentTask', None) - actionNumber = getattr(getattr(self, 'workflow', None), 'currentAction', None) - - reactRecord = { - "timestamp": timestamp, - "context": contextText, - "workflowId": workflowId, - "round": roundNumber, - "task": taskNumber, - "action": actionNumber, - "data": data - } - with open(reactFile, "a", encoding="utf-8") as rf: - rf.write(json.dumps(reactRecord, ensure_ascii=False, default=str) + "\n") - except Exception: - pass - - except Exception as e: - # Don't log trace errors to avoid recursion - pass diff --git a/modules/workflows/processing/workflowProcessor.py b/modules/workflows/processing/workflowProcessor.py index 13da0519..a80dba87 100644 --- a/modules/workflows/processing/workflowProcessor.py +++ b/modules/workflows/processing/workflowProcessor.py @@ -49,8 +49,7 @@ class WorkflowProcessor: """Generate a high-level task plan for the workflow""" import time - # Create progress logger - progressLogger = self.services.workflow.createProgressLogger(workflow) + # Init progress logger operationId = f"taskPlan_{workflow.id}_{int(time.time())}" try: @@ -58,7 +57,7 @@ class WorkflowProcessor: self._checkWorkflowStopped(workflow) # Start progress tracking - progressLogger.startOperation( + self.services.workflow.progressLogStart( operationId, "Workflow Planning", "Task Plan Generation", @@ -74,25 +73,25 @@ class WorkflowProcessor: logger.info(f"Workflow Mode: {workflow.workflowMode}") # Update progress - generating task plan - progressLogger.updateProgress(operationId, 0.3, "Analyzing input") + self.services.workflow.progressLogUpdate(operationId, 0.3, "Analyzing input") # Delegate to the appropriate mode taskPlan = await self.mode.generateTaskPlan(userInput, workflow) # Update progress - creating task plan message - progressLogger.updateProgress(operationId, 0.8, "Creating plan") + self.services.workflow.progressLogUpdate(operationId, 0.8, "Creating plan") # Create task plan message await self.mode.createTaskPlanMessage(taskPlan, workflow) # Complete progress tracking - progressLogger.completeOperation(operationId, True) + self.services.workflow.progressLogFinish(operationId, True) return taskPlan except Exception as e: logger.error(f"Error in generateTaskPlan: {str(e)}") # Complete progress tracking with failure - progressLogger.completeOperation(operationId, False) + self.services.workflow.progressLogFinish(operationId, False) raise async def executeTask(self, taskStep: TaskStep, workflow: ChatWorkflow, context: TaskContext, @@ -100,8 +99,7 @@ class WorkflowProcessor: """Execute a task step using the appropriate mode""" import time - # Create progress logger - progressLogger = self.services.workflow.createProgressLogger(workflow) + # Init progress logger operationId = f"taskExec_{workflow.id}_{taskIndex}_{int(time.time())}" try: @@ -109,7 +107,7 @@ class WorkflowProcessor: self._checkWorkflowStopped(workflow) # Start progress tracking - progressLogger.startOperation( + self.services.workflow.progressLogStart( operationId, "Workflow Execution", "Task Execution", @@ -121,19 +119,19 @@ class WorkflowProcessor: logger.info(f"Mode: {workflow.workflowMode}") # Update progress - executing task - progressLogger.updateProgress(operationId, 0.2, "Executing") + self.services.workflow.progressLogUpdate(operationId, 0.2, "Executing") # Delegate to the appropriate mode result = await self.mode.executeTask(taskStep, workflow, context, taskIndex, totalTasks) # Complete progress tracking - progressLogger.completeOperation(operationId, True) + self.services.workflow.progressLogFinish(operationId, True) return result except Exception as e: logger.error(f"Error in executeTask: {str(e)}") # Complete progress tracking with failure - progressLogger.completeOperation(operationId, False) + self.services.workflow.progressLogFinish(operationId, False) raise async def generateActionItems(self, taskStep: TaskStep, workflow: ChatWorkflow, @@ -276,78 +274,7 @@ class WorkflowProcessor: except Exception as e: logger.error(f"Error resetting workflow for new session: {str(e)}") - def writeTraceLog(self, contextText: str, data: Any) -> None: - """Write trace data to configured trace file if in debug mode""" - try: - import os - import json - from datetime import datetime, UTC - - # Only write if logger is in debug mode - if logger.level > logging.DEBUG: - return - - # Get log directory from configuration - logDir = self.services.utils.configGet("APP_LOGGING_LOG_DIR", "./") - if not os.path.isabs(logDir): - # If relative path, make it relative to the gateway directory - gatewayDir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) - logDir = os.path.join(gatewayDir, logDir) - - # Ensure log directory exists - os.makedirs(logDir, exist_ok=True) - - # Create trace file path - traceFile = os.path.join(logDir, "log_trace.log") - - # Format the trace entry - timestamp = datetime.fromtimestamp(self.services.utils.timestampGetUtc(), UTC).strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] - traceEntry = f"[{timestamp}] {contextText}\n" - - # Add data if provided - show full content without truncation - if data is not None: - if isinstance(data, (dict, list)): - # Use ensure_ascii=False to preserve Unicode characters and indent=2 for readability - traceEntry += f"Data: {json.dumps(data, indent=2, default=str, ensure_ascii=False)}\n" - else: - # For string data, show full content without truncation - traceEntry += f"Data: {str(data)}\n" - - traceEntry += "-" * 80 + "\n\n" - - # Write to trace file - with open(traceFile, "a", encoding="utf-8") as f: - f.write(traceEntry) - - except Exception as e: - # Don't log trace errors to avoid recursion - pass - - def clearTraceLog(self) -> None: - """Clear the trace log file""" - try: - import os - - # Get log directory from configuration - logDir = self.services.utils.configGet("APP_LOGGING_LOG_DIR", "./") - if not os.path.isabs(logDir): - # If relative path, make it relative to the gateway directory - gatewayDir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) - logDir = os.path.join(gatewayDir, logDir) - - # Create trace file path - traceFile = os.path.join(logDir, "log_trace.log") - - # Clear the trace file - if os.path.exists(traceFile): - with open(traceFile, "w", encoding="utf-8") as f: - f.write("") - logger.info("Trace log cleared") - else: - logger.info("Trace log file does not exist, nothing to clear") - - except Exception as e: - logger.error(f"Error clearing trace log: {str(e)}") + async def prepareTaskHandover(self, taskStep, taskActions, taskResult, workflow): """Prepare task handover data for workflow coordination"""