From 1917c007214b6bfe9c3699528ef39e270a31d127 Mon Sep 17 00:00:00 2001 From: ValueOn AG Date: Sat, 18 Oct 2025 02:29:03 +0200 Subject: [PATCH] Streamlined UI Log messages --- modules/interfaces/interfaceDbChatObjects.py | 1 - .../serviceAi/subDocumentGeneration.py | 70 ++++++- .../serviceWorkflow/mainServiceWorkflow.py | 6 +- modules/shared/progressLogger.py | 125 ++++++++++++ modules/shared/progressLogger_example.py | 183 ++++++++++++++++++ modules/workflows/methods/methodAi.py | 43 ++++ .../workflows/processing/workflowProcessor.py | 51 ++++- modules/workflows/workflowManager.py | 12 -- 8 files changed, 473 insertions(+), 18 deletions(-) create mode 100644 modules/shared/progressLogger.py create mode 100644 modules/shared/progressLogger_example.py diff --git a/modules/interfaces/interfaceDbChatObjects.py b/modules/interfaces/interfaceDbChatObjects.py index f2900bbf..ce39a1e5 100644 --- a/modules/interfaces/interfaceDbChatObjects.py +++ b/modules/interfaces/interfaceDbChatObjects.py @@ -839,7 +839,6 @@ class ChatObjects: # Return validated ChatLog instance return ChatLog(**createdLog) - # Stats methods def getStats(self, workflowId: str) -> List[ChatStat]: diff --git a/modules/services/serviceAi/subDocumentGeneration.py b/modules/services/serviceAi/subDocumentGeneration.py index eafed975..8315a592 100644 --- a/modules/services/serviceAi/subDocumentGeneration.py +++ b/modules/services/serviceAi/subDocumentGeneration.py @@ -81,7 +81,22 @@ class SubDocumentGeneration: generationPrompt: Optional[str] = None ) -> Dict[str, Any]: """Handle single-file document generation (existing functionality).""" + import time + + # Create progress logger + workflow = self.services.currentWorkflow + progressLogger = self.services.workflow.createProgressLogger(workflow) + operationId = f"docGenSingle_{workflow.id}_{int(time.time())}" + try: + # Start progress tracking + progressLogger.startOperation( + operationId, + "Generate", + "Single-file Generation", + f"Processing {len(documents) if documents else 0} documents" + ) + # Get format-specific extraction prompt from generation service from modules.services.serviceGeneration.mainServiceGeneration import GenerationService generation_service = GenerationService(self.services) @@ -90,6 +105,9 @@ class SubDocumentGeneration: if not title: title = "AI Generated Document" + # Update progress - generating extraction prompt + progressLogger.updateProgress(operationId, 0.1, "Generating prompt") + # Get format-specific extraction prompt extractionPrompt = await generation_service.getExtractionPrompt( outputFormat=outputFormat, @@ -98,9 +116,15 @@ class SubDocumentGeneration: aiService=self ) + # Update progress - starting AI processing + progressLogger.updateProgress(operationId, 0.3, "AI processing") + # Process documents with format-specific prompt using JSON mode # This ensures structured JSON output instead of text aiResponseJson = await self._callAiJson(extractionPrompt, documents, options) + + # Update progress - AI processing completed + progressLogger.updateProgress(operationId, 0.6, "Processing done") # Validate JSON response if not isinstance(aiResponseJson, dict) or "sections" not in aiResponseJson: @@ -245,8 +269,10 @@ class SubDocumentGeneration: safeTitle = ''.join(c if c.isalnum() else '-' for c in (title or 'document')).strip('-') filename = f"{safeTitle or 'document'}-{timestamp}.{outputFormat}" - # Return structured result with document information - return { + # Update progress - generation completed + progressLogger.updateProgress(operationId, 0.9, "Rendering") + + result = { "success": True, "content": aiResponseJson, # Structured JSON document "rendered_content": renderedContent, # Formatted content @@ -262,8 +288,15 @@ class SubDocumentGeneration: "is_multi_file": False } + # Complete progress tracking + progressLogger.completeOperation(operationId, True) + + return result + except Exception as e: logger.error(f"Error in single-file document generation: {str(e)}") + # Complete progress tracking with failure + progressLogger.completeOperation(operationId, False) raise async def _callAiWithMultiFileGeneration( @@ -276,7 +309,22 @@ class SubDocumentGeneration: prompt_analysis: Dict[str, Any] ) -> Dict[str, Any]: """Handle multi-file document generation using AI analysis.""" + import time + + # Create progress logger + workflow = self.services.currentWorkflow + progressLogger = self.services.workflow.createProgressLogger(workflow) + operationId = f"docGen_{workflow.id}_{int(time.time())}" + try: + # Start progress tracking + progressLogger.startOperation( + operationId, + "Generate", + "Multi-file Generation", + f"Processing {len(documents) if documents else 0} documents" + ) + # Get multi-file extraction prompt based on AI analysis from modules.services.serviceGeneration.mainServiceGeneration import GenerationService generation_service = GenerationService(self.services) @@ -285,6 +333,9 @@ class SubDocumentGeneration: if not title: title = "AI Generated Documents" + # Update progress - generating extraction prompt + progressLogger.updateProgress(operationId, 0.1, "Generating prompt") + # Get adaptive extraction prompt extraction_prompt = await generation_service.getAdaptiveExtractionPrompt( outputFormat=outputFormat, @@ -297,6 +348,9 @@ class SubDocumentGeneration: logger.info(f"Adaptive extraction prompt length: {len(extraction_prompt)} characters") logger.debug(f"Adaptive extraction prompt preview: {extraction_prompt[:500]}...") + # Update progress - starting document processing + progressLogger.updateProgress(operationId, 0.2, "Processing docs") + # Process with adaptive JSON schema - use the existing pipeline but with adaptive prompt logger.info(f"Using adaptive prompt with existing pipeline: {len(extraction_prompt)} chars") logger.debug(f"Processing documents: {len(documents) if documents else 0} documents") @@ -551,7 +605,10 @@ class SubDocumentGeneration: except Exception as e: logger.warning(f"Failed to save multi-file debug output: {e}") - return { + # Update progress - generation completed + progressLogger.updateProgress(operationId, 0.9, "Rendering") + + result = { "success": True, "content": ai_response, "rendered_content": None, # Not applicable for multi-file @@ -564,8 +621,15 @@ class SubDocumentGeneration: "split_strategy": prompt_analysis.get("strategy", "custom") } + # Complete progress tracking + progressLogger.completeOperation(operationId, True) + + return result + except Exception as e: logger.error(f"Error in multi-file document generation: {str(e)}") + # Complete progress tracking with failure + progressLogger.completeOperation(operationId, False) # Fallback to single-file return await self._callAiWithSingleFileGeneration( prompt, documents, options, outputFormat, title diff --git a/modules/services/serviceWorkflow/mainServiceWorkflow.py b/modules/services/serviceWorkflow/mainServiceWorkflow.py index 589971c6..ff814ac2 100644 --- a/modules/services/serviceWorkflow/mainServiceWorkflow.py +++ b/modules/services/serviceWorkflow/mainServiceWorkflow.py @@ -9,6 +9,7 @@ from modules.services.serviceGeneration.subDocumentUtility import getFileExtensi from modules.shared.timezoneUtils import get_utc_timestamp from modules.services.serviceAi.mainServiceAi import AiService from modules.security.tokenManager import TokenManager +from modules.shared.progressLogger import ProgressLogger logger = logging.getLogger(__name__) @@ -902,4 +903,7 @@ class WorkflowService: return [] except Exception as e: logger.error(f"Error getting connection reference list: {str(e)}") - return [] \ No newline at end of file + return [] + + def createProgressLogger(self, workflow) -> ProgressLogger: + return ProgressLogger(self, workflow) \ No newline at end of file diff --git a/modules/shared/progressLogger.py b/modules/shared/progressLogger.py new file mode 100644 index 00000000..daa573c6 --- /dev/null +++ b/modules/shared/progressLogger.py @@ -0,0 +1,125 @@ +""" +Progress Logger utility for standardized progress reporting in workflows. +Provides centralized progress management with start/update/complete methods. +""" + +import time +import logging +from typing import Dict, Any, Optional + +logger = logging.getLogger(__name__) + + +class ProgressLogger: + """Centralized progress logger for workflow operations.""" + + def __init__(self, workflowService, workflow): + """Initialize progress logger. + + Args: + workflowService: WorkflowService instance for logging + workflow: Workflow object to get workflowId from + """ + self.workflowService = workflowService + self.workflow = workflow + self.activeOperations = {} + + def startOperation(self, operationId: str, serviceName: str, actionName: str, context: str = ""): + """Start a new long-running operation. + + Args: + operationId: Unique identifier for the operation + serviceName: Name of the service (e.g., "Extract", "AI", "Generate") + actionName: Name of the action being performed + context: Additional context information + """ + self.activeOperations[operationId] = { + 'service': serviceName, + 'action': actionName, + 'context': context, + 'startTime': time.time() + } + self._logProgress(operationId, 0.0, f"Starting {actionName}") + logger.debug(f"Started operation {operationId}: {serviceName} - {actionName}") + + def updateProgress(self, operationId: str, progress: float, statusUpdate: str = ""): + """Update progress for an operation. + + Args: + operationId: Unique identifier for the operation + progress: Progress value between 0.0 and 1.0 + statusUpdate: Additional status information + """ + if operationId not in self.activeOperations: + logger.warning(f"Operation {operationId} not found for progress update") + return + + op = self.activeOperations[operationId] + context = f"{op['context']} {statusUpdate}".strip() + self._logProgress(operationId, progress, context) + logger.debug(f"Updated operation {operationId}: {progress:.2f} - {context}") + + def completeOperation(self, operationId: str, success: bool = True): + """Complete an operation. + + Args: + operationId: Unique identifier for the operation + success: Whether the operation completed successfully + """ + if operationId not in self.activeOperations: + logger.warning(f"Operation {operationId} not found for completion") + return + + op = self.activeOperations[operationId] + finalProgress = 1.0 if success else 0.0 + status = "Done" if success else "Failed" + + # Create completion log BEFORE removing from activeOperations + self._logProgress(operationId, finalProgress, status) + + # Log completion time + duration = time.time() - op['startTime'] + logger.info(f"Completed operation {operationId}: {op['service']} - {op['action']} in {duration:.2f}s") + + # Remove from active operations AFTER creating the log + del self.activeOperations[operationId] + + def _logProgress(self, operationId: str, progress: float, status: str): + """Create standardized ChatLog entry. + + Args: + operationId: Unique identifier for the operation + progress: Progress value between 0.0 and 1.0 + status: Status information for the log entry + """ + if operationId not in self.activeOperations: + return + + op = self.activeOperations[operationId] + message = f"Service {op['service']}" + + logData = { + "workflowId": self.workflow.id, + "message": message, + "type": "info", + "status": status, + "progress": progress + } + + try: + self.workflowService.storeLog(self.workflow, logData) + except Exception as e: + logger.error(f"Failed to store progress log: {e}") + + def getActiveOperations(self) -> Dict[str, Dict[str, Any]]: + """Get all currently active operations. + + Returns: + Dictionary of active operations + """ + return self.activeOperations.copy() + + def clearAllOperations(self): + """Clear all active operations (for cleanup).""" + self.activeOperations.clear() + logger.debug("Cleared all active operations") diff --git a/modules/shared/progressLogger_example.py b/modules/shared/progressLogger_example.py new file mode 100644 index 00000000..13146846 --- /dev/null +++ b/modules/shared/progressLogger_example.py @@ -0,0 +1,183 @@ +""" +Example usage of ProgressLogger for workflow progress tracking. + +This file demonstrates how to use the ProgressLogger utility to track +progress of long-running operations in workflows. +""" + +import asyncio +import time +from modules.shared.progressLogger import ProgressLogger + + +async def exampleDocumentProcessing(workflowService, workflow): + """Example of document processing with progress tracking.""" + + # Create progress logger + progressLogger = workflowService.createProgressLogger(workflow) + operationId = f"docProcess_{workflow.id}_{int(time.time())}" + + try: + # Start operation + progressLogger.startOperation( + operationId, + "Extract", + "Document Processing", + "Processing 5 documents" + ) + + # Simulate processing steps + documents = ["doc1.pdf", "doc2.docx", "doc3.txt", "doc4.xlsx", "doc5.pdf"] + + for i, doc in enumerate(documents): + # Update progress for each document + progress = (i + 1) / len(documents) + progressLogger.updateProgress( + operationId, + progress, + f"Processing {doc} ({i+1}/{len(documents)})" + ) + + # Simulate processing time + await asyncio.sleep(0.5) + + # Complete operation + progressLogger.completeOperation(operationId, True) + + except Exception as e: + # Complete with failure + progressLogger.completeOperation(operationId, False) + raise + + +async def exampleAiProcessing(workflowService, workflow): + """Example of AI processing with chunk progress tracking.""" + + progressLogger = workflowService.createProgressLogger(workflow) + operationId = f"aiProcess_{workflow.id}_{int(time.time())}" + + try: + # Start operation + progressLogger.startOperation( + operationId, + "AI", + "Content Analysis", + "Processing 10 chunks" + ) + + # Simulate AI processing with chunks + chunks = list(range(1, 11)) + + for i, chunk in enumerate(chunks): + progress = (i + 1) / len(chunks) + progressLogger.updateProgress( + operationId, + progress, + f"Processing chunk {chunk} of {len(chunks)}" + ) + + # Simulate AI processing time + await asyncio.sleep(0.3) + + # Complete operation + progressLogger.completeOperation(operationId, True) + + except Exception as e: + progressLogger.completeOperation(operationId, False) + raise + + +async def exampleWorkflowTask(workflowService, workflow): + """Example of workflow task execution with progress tracking.""" + + progressLogger = workflowService.createProgressLogger(workflow) + operationId = f"workflowTask_{workflow.id}_{int(time.time())}" + + try: + # Start operation + progressLogger.startOperation( + operationId, + "Workflow", + "Task Execution", + "Executing data analysis task" + ) + + # Simulate task steps + steps = [ + "Initializing analysis", + "Loading data", + "Processing data", + "Generating results", + "Saving output" + ] + + for i, step in enumerate(steps): + progress = (i + 1) / len(steps) + progressLogger.updateProgress( + operationId, + progress, + step + ) + + # Simulate step processing time + await asyncio.sleep(0.4) + + # Complete operation + progressLogger.completeOperation(operationId, True) + + except Exception as e: + progressLogger.completeOperation(operationId, False) + raise + + +# Example of how to integrate into existing services +class ExampleService: + """Example service showing integration with ProgressLogger.""" + + def __init__(self, workflowService): + self.workflowService = workflowService + + async def processWithProgress(self, workflow, data): + """Process data with progress tracking.""" + + progressLogger = self.workflowService.createProgressLogger(workflow) + operationId = f"example_{workflow.id}_{int(time.time())}" + + try: + # Start operation + progressLogger.startOperation( + operationId, + "Example", + "Data Processing", + f"Processing {len(data)} items" + ) + + # Process data in chunks + chunkSize = 10 + totalChunks = (len(data) + chunkSize - 1) // chunkSize + + for i in range(0, len(data), chunkSize): + chunk = data[i:i + chunkSize] + chunkNum = i // chunkSize + 1 + + progress = chunkNum / totalChunks + progressLogger.updateProgress( + operationId, + progress, + f"Processing chunk {chunkNum}/{totalChunks}" + ) + + # Process chunk + await self._processChunk(chunk) + + # Complete operation + progressLogger.completeOperation(operationId, True) + + except Exception as e: + progressLogger.completeOperation(operationId, False) + raise + + async def _processChunk(self, chunk): + """Process a chunk of data.""" + # Simulate processing + await asyncio.sleep(0.1) diff --git a/modules/workflows/methods/methodAi.py b/modules/workflows/methods/methodAi.py index eba17bcb..c820114e 100644 --- a/modules/workflows/methods/methodAi.py +++ b/modules/workflows/methods/methodAi.py @@ -48,6 +48,19 @@ class MethodAi(MethodBase): - requiredTags (list, optional): Capability tags (e.g., text, chat, reasoning, analysis, image, vision, web, search). """ try: + # Create progress logger + import time + progressLogger = self.services.workflow.createProgressLogger(self.services.currentWorkflow) + operationId = f"ai_process_{self.services.currentWorkflow.id}_{int(time.time())}" + + # Start progress tracking + progressLogger.startOperation( + operationId, + "Generate", + "AI Processing", + f"Format: {parameters.get('resultType', 'txt')}" + ) + # Debug logging to see what parameters are received logger.info(f"MethodAi.process received parameters: {parameters}") logger.info(f"Parameters type: {type(parameters)}") @@ -56,6 +69,9 @@ class MethodAi(MethodBase): aiPrompt = parameters.get("aiPrompt") logger.info(f"aiPrompt extracted: '{aiPrompt}' (type: {type(aiPrompt)})") + # Update progress - preparing parameters + progressLogger.updateProgress(operationId, 0.2, "Preparing parameters") + documentList = parameters.get("documentList", []) if isinstance(documentList, str): documentList = [documentList] @@ -80,6 +96,9 @@ class MethodAi(MethodBase): output_mime_type = "application/octet-stream" # Prefer service-provided mimeType when available logger.info(f"Using result type: {resultType} -> {output_extension}") + # Update progress - preparing documents + progressLogger.updateProgress(operationId, 0.3, "Preparing documents") + # Get ChatDocuments for AI service - let AI service handle all document processing chatDocuments = [] if documentList: @@ -87,6 +106,9 @@ class MethodAi(MethodBase): if chatDocuments: logger.info(f"Prepared {len(chatDocuments)} documents for AI processing") + # Update progress - building prompt + progressLogger.updateProgress(operationId, 0.4, "Building prompt") + # Build enhanced prompt enhanced_prompt = aiPrompt @@ -120,12 +142,18 @@ class MethodAi(MethodBase): supported_generation_formats = {"html", "pdf", "docx", "txt", "md", "json", "csv", "xlsx"} output_format_arg = output_format if output_format in supported_generation_formats else None + # Update progress - calling AI + progressLogger.updateProgress(operationId, 0.6, "Calling AI") + result = await self.services.ai.callAi( prompt=enhanced_prompt, documents=chatDocuments if chatDocuments else None, options=options, outputFormat=output_format_arg ) + + # Update progress - processing result + progressLogger.updateProgress(operationId, 0.8, "Processing result") from modules.datamodels.datamodelChat import ActionDocument @@ -137,6 +165,10 @@ class MethodAi(MethodBase): documentData=d.get("documentData"), mimeType=d.get("mimeType") or output_mime_type )) + + # Complete progress tracking + progressLogger.completeOperation(operationId, True) + return ActionResult.isSuccess(documents=action_documents) extension = output_extension.lstrip('.') @@ -150,10 +182,21 @@ class MethodAi(MethodBase): documentData=result, mimeType=output_mime_type ) + + # Complete progress tracking + progressLogger.completeOperation(operationId, True) + return ActionResult.isSuccess(documents=[action_document]) except Exception as e: logger.error(f"Error in AI processing: {str(e)}") + + # Complete progress tracking with failure + try: + progressLogger.completeOperation(operationId, False) + except: + pass # Don't fail on progress logging errors + return ActionResult.isFailure( error=str(e) ) diff --git a/modules/workflows/processing/workflowProcessor.py b/modules/workflows/processing/workflowProcessor.py index 1b46a72c..787edbe1 100644 --- a/modules/workflows/processing/workflowProcessor.py +++ b/modules/workflows/processing/workflowProcessor.py @@ -47,10 +47,24 @@ class WorkflowProcessor: async def generateTaskPlan(self, userInput: str, workflow: ChatWorkflow) -> TaskPlan: """Generate a high-level task plan for the workflow""" + import time + + # Create progress logger + progressLogger = self.services.workflow.createProgressLogger(workflow) + operationId = f"taskPlan_{workflow.id}_{int(time.time())}" + try: # Check workflow status before generating task plan self._checkWorkflowStopped(workflow) + # Start progress tracking + progressLogger.startOperation( + operationId, + "Workflow Planning", + "Task Plan Generation", + f"Mode: {workflow.workflowMode}" + ) + # Initialize currentUserLanguage to empty at workflow start self.services.currentUserLanguage = "" @@ -59,32 +73,67 @@ class WorkflowProcessor: logger.info(f"User Input: {userInput}") logger.info(f"Workflow Mode: {workflow.workflowMode}") + # Update progress - generating task plan + progressLogger.updateProgress(operationId, 0.3, "Analyzing input") + # Delegate to the appropriate mode taskPlan = await self.mode.generateTaskPlan(userInput, workflow) + # Update progress - creating task plan message + progressLogger.updateProgress(operationId, 0.8, "Creating plan") + # Create task plan message await self.mode.createTaskPlanMessage(taskPlan, workflow) + # Complete progress tracking + progressLogger.completeOperation(operationId, True) + return taskPlan except Exception as e: logger.error(f"Error in generateTaskPlan: {str(e)}") + # Complete progress tracking with failure + progressLogger.completeOperation(operationId, False) raise async def executeTask(self, taskStep: TaskStep, workflow: ChatWorkflow, context: TaskContext, taskIndex: int = None, totalTasks: int = None) -> TaskResult: """Execute a task step using the appropriate mode""" + import time + + # Create progress logger + progressLogger = self.services.workflow.createProgressLogger(workflow) + operationId = f"taskExec_{workflow.id}_{taskIndex}_{int(time.time())}" + try: # Check workflow status before executing task self._checkWorkflowStopped(workflow) + # Start progress tracking + progressLogger.startOperation( + operationId, + "Workflow Execution", + "Task Execution", + f"Task {taskIndex}/{totalTasks}" + ) + logger.info(f"=== STARTING TASK EXECUTION ===") logger.info(f"Task: {taskStep.objective}") logger.info(f"Mode: {workflow.workflowMode}") + # Update progress - executing task + progressLogger.updateProgress(operationId, 0.2, "Executing") + # Delegate to the appropriate mode - return await self.mode.executeTask(taskStep, workflow, context, taskIndex, totalTasks) + result = await self.mode.executeTask(taskStep, workflow, context, taskIndex, totalTasks) + + # Complete progress tracking + progressLogger.completeOperation(operationId, True) + + return result except Exception as e: logger.error(f"Error in executeTask: {str(e)}") + # Complete progress tracking with failure + progressLogger.completeOperation(operationId, False) raise async def generateActionItems(self, taskStep: TaskStep, workflow: ChatWorkflow, diff --git a/modules/workflows/workflowManager.py b/modules/workflows/workflowManager.py index 88899e49..6808f0fa 100644 --- a/modules/workflows/workflowManager.py +++ b/modules/workflows/workflowManager.py @@ -257,18 +257,6 @@ class WorkflowManager: except Exception: pass - # Telemetry (sizes and counts) - try: - inputSize = len(userInput.prompt.encode('utf-8')) if userInput and userInput.prompt else 0 - outputSize = len(aiResponse.encode('utf-8')) if aiResponse else 0 - self.services.workflow.storeLog(workflow, { - "message": f"User prompt analyzed (input {inputSize} bytes, output {outputSize} bytes, items {len(contextItems)})", - "type": "info", - "status": "running", - "progress": 0 - }) - except Exception: - pass # Create documents for context items if contextItems and isinstance(contextItems, list):