From 26b2109844e13e2ef773a7688959dcabd3a92802 Mon Sep 17 00:00:00 2001
From: ValueOn AG
Date: Wed, 29 Oct 2025 23:40:39 +0100
Subject: [PATCH] ui progress tracking integrated
---
modules/services/serviceAi/subCoreAi.py | 302 +++++++++---------
.../serviceAi/subDocumentProcessing.py | 88 +++--
.../mainServiceGeneration.py | 18 --
.../serviceGeneration/renderers/registry.py | 2 -
.../subPromptBuilderGeneration.py | 2 +-
.../serviceWorkflow/mainServiceWorkflow.py | 1 -
modules/workflows/methods/methodAi.py | 45 +--
test4_method_ai_operations.py | 13 +-
8 files changed, 243 insertions(+), 228 deletions(-)
diff --git a/modules/services/serviceAi/subCoreAi.py b/modules/services/serviceAi/subCoreAi.py
index 4e9d1bf6..e35af0d0 100644
--- a/modules/services/serviceAi/subCoreAi.py
+++ b/modules/services/serviceAi/subCoreAi.py
@@ -126,7 +126,8 @@ Respond with ONLY a JSON object in this exact format:
options: AiCallOptions,
debugPrefix: str = "ai_call",
promptBuilder: Optional[callable] = None,
- promptArgs: Optional[Dict[str, Any]] = None
+ promptArgs: Optional[Dict[str, Any]] = None,
+ operationId: Optional[str] = None
) -> str:
"""
Shared core function for AI calls with repair-based looping system.
@@ -136,6 +137,9 @@ Respond with ONLY a JSON object in this exact format:
prompt: The prompt to send to AI
options: AI call configuration options
debugPrefix: Prefix for debug file names
+ promptBuilder: Optional function to rebuild prompts for continuation
+ promptArgs: Optional arguments for prompt builder
+ operationId: Optional operation ID for progress tracking
Returns:
Complete AI response after all iterations
@@ -145,31 +149,35 @@ Respond with ONLY a JSON object in this exact format:
allSections = [] # Accumulate all sections across iterations
lastRawResponse = None # Store last raw JSON response for continuation
- logger.debug(f"Starting AI call with repair-based looping (debug prefix: {debugPrefix})")
-
while iteration < max_iterations:
iteration += 1
- logger.debug(f"AI call iteration {iteration}/{max_iterations}")
+
+ # Update progress for iteration start
+ if operationId:
+ if iteration == 1:
+ self.services.workflow.progressLogUpdate(operationId, 0.5, f"Starting AI call iteration {iteration}")
+ else:
+ # For continuation iterations, show progress incrementally
+ base_progress = 0.5 + (min(iteration - 1, max_iterations) / max_iterations * 0.4) # Progress from 0.5 to 0.9 over max_iterations iterations
+ self.services.workflow.progressLogUpdate(operationId, base_progress, f"Continuing generation (iteration {iteration})")
# Build iteration prompt
if len(allSections) > 0 and promptBuilder and promptArgs:
# This is a continuation - build continuation context with raw JSON and rebuild prompt
continuationContext = buildContinuationContext(allSections, lastRawResponse)
- logger.info(f"Continuation context: {continuationContext.get('section_count')} sections")
- if lastRawResponse:
- logger.debug(f"Iteration {iteration}: Including previous response in continuation context ({len(lastRawResponse)} chars)")
- else:
+ if not lastRawResponse:
logger.warning(f"Iteration {iteration}: No previous response available for continuation!")
# Rebuild prompt with continuation context using the provided prompt builder
iterationPrompt = await promptBuilder(**promptArgs, continuationContext=continuationContext)
- logger.debug(f"Rebuilt prompt with continuation context for iteration {iteration}")
else:
# First iteration - use original prompt
iterationPrompt = prompt
# Make AI call
try:
+ if operationId and iteration == 1:
+ self.services.workflow.progressLogUpdate(operationId, 0.51, "Calling AI model")
from modules.datamodels.datamodelAi import AiCallRequest
request = AiCallRequest(
prompt=iterationPrompt,
@@ -186,12 +194,13 @@ Respond with ONLY a JSON object in this exact format:
response = await self.aiObjects.call(request)
result = response.content
- # Debug: Check response immediately from API
- if iteration == 1 and result:
- first_chars = result[:200].replace('\n', '\\n').replace('\r', '\\r')
- logger.debug(f"Iteration 1: Raw API response starts with (first 200 chars): '{first_chars}'")
- if result.strip().startswith('},') or result.strip().startswith('],'):
- logger.error(f"Iteration 1: API returned fragment! Full start: '{result[:200]}'")
+ # Update progress after AI call
+ if operationId:
+ if iteration == 1:
+ self.services.workflow.progressLogUpdate(operationId, 0.6, f"AI response received (iteration {iteration})")
+ else:
+ progress = 0.6 + (min(iteration - 1, 10) * 0.03)
+ self.services.workflow.progressLogUpdate(operationId, progress, f"Processing response (iteration {iteration})")
# Write raw AI response to debug file
if iteration == 1:
@@ -216,11 +225,16 @@ Respond with ONLY a JSON object in this exact format:
# Check for complete_response flag in raw response (before parsing)
import re
if re.search(r'"complete_response"\s*:\s*true', result, re.IGNORECASE):
- logger.info(f"Iteration {iteration}: Detected complete_response flag in raw response")
+ pass # Flag detected, will stop in _shouldContinueGeneration
# Extract sections from response (handles both valid and broken JSON)
extractedSections, wasJsonComplete = self._extractSectionsFromResponse(result, iteration, debugPrefix)
+ # Update progress after parsing
+ if operationId:
+ if extractedSections:
+ self.services.workflow.progressLogUpdate(operationId, 0.65 + (min(iteration - 1, 10) * 0.025), f"Extracted {len(extractedSections)} sections (iteration {iteration})")
+
if not extractedSections:
# If we're in continuation mode and JSON was incomplete, don't stop - continue to allow retry
if iteration > 1 and not wasJsonComplete:
@@ -232,15 +246,14 @@ Respond with ONLY a JSON object in this exact format:
# Add new sections to accumulator
allSections.extend(extractedSections)
- logger.info(f"Iteration {iteration}: Extracted {len(extractedSections)} sections (total: {len(allSections)})")
# Check if we should continue (completion detection)
if self._shouldContinueGeneration(allSections, iteration, wasJsonComplete, result):
- logger.debug(f"Iteration {iteration}: Continuing generation")
continue
else:
# Done - build final result
- logger.info(f"Iteration {iteration}: Generation complete")
+ if operationId:
+ self.services.workflow.progressLogUpdate(operationId, 0.95, f"Generation complete ({iteration} iterations, {len(allSections)} sections)")
break
except Exception as e:
@@ -256,7 +269,6 @@ Respond with ONLY a JSON object in this exact format:
# Write final result to debug file
self.services.utils.writeDebugFile(final_result, f"{debugPrefix}_final_result")
- logger.info(f"AI call completed: {len(allSections)} total sections from {iteration} iterations")
return final_result
def _extractSectionsFromResponse(
@@ -278,12 +290,9 @@ Respond with ONLY a JSON object in this exact format:
# Check if AI marked response as complete
isComplete = parsed_result.get("complete_response", False) == True
- if isComplete:
- logger.info(f"Iteration {iteration}: AI marked response as complete (complete_response: true)")
# Extract sections from parsed JSON
sections = extractSectionsFromDocument(parsed_result)
- logger.debug(f"Iteration {iteration}: Valid JSON - extracted {len(sections)} sections")
# If AI marked as complete, always return as complete
if isComplete:
@@ -291,9 +300,7 @@ Respond with ONLY a JSON object in this exact format:
# If in continuation mode (iteration > 1), continuation responses are expected to be fragments
# A fragment with 0 extractable sections means JSON is incomplete - need another iteration
- # Don't use repair mechanism - just mark as incomplete so loop continues
if len(sections) == 0 and iteration > 1:
- logger.info(f"Iteration {iteration}: Continuation fragment with 0 extractable sections - JSON incomplete, continuing")
return sections, False # Mark as incomplete so loop continues
# First iteration with 0 sections means empty response - stop
@@ -304,7 +311,6 @@ Respond with ONLY a JSON object in this exact format:
except json.JSONDecodeError as e:
# Broken JSON - try repair mechanism (normal in iterative generation)
- logger.info(f"Iteration {iteration}: JSON incomplete/broken, attempting repair: {str(e)}")
self.services.utils.writeDebugFile(result, f"{debugPrefix}_broken_json_iteration_{iteration}")
# Try to repair
@@ -313,7 +319,6 @@ Respond with ONLY a JSON object in this exact format:
if repaired_json:
# Extract sections from repaired JSON
sections = extractSectionsFromDocument(repaired_json)
- logger.info(f"Iteration {iteration}: Repaired JSON - extracted {len(sections)} sections")
return sections, False # JSON was broken but repaired
else:
# Repair failed - log error
@@ -341,18 +346,14 @@ Respond with ONLY a JSON object in this exact format:
# Check for complete_response flag in raw response
if rawResponse:
import re
- # Look for complete_response: true pattern (allowing for whitespace variations)
if re.search(r'"complete_response"\s*:\s*true', rawResponse, re.IGNORECASE):
- logger.info("AI marked response as complete (complete_response: true) - stopping generation")
return False
# If JSON was complete (and no complete_response flag), we're done
# If JSON was broken and repaired, continue to get more content
if wasJsonComplete:
- logger.info("JSON was complete - stopping generation")
return False
else:
- logger.info("JSON was broken/repaired - continuing generation")
return True
def _buildFinalResultFromSections(
@@ -407,7 +408,6 @@ Respond with ONLY a JSON object in this exact format:
Planning JSON response
"""
# Planning calls always use static parameters
- logger.debug("Using static parameters for planning call")
options = AiCallOptions(
operationType=OperationTypeEnum.PLAN,
priority=PriorityEnum.QUALITY,
@@ -449,110 +449,138 @@ Respond with ONLY a JSON object in this exact format:
Returns:
AI response as string, or dict with documents if outputFormat is specified
"""
- if options is None or (hasattr(options, 'operationType') and options.operationType is None):
- # Use AI to determine parameters ONLY when truly needed (options=None OR operationType=None)
- logger.debug("Analyzing prompt to determine optimal parameters")
- options = await self._analyzePromptAndCreateOptions(prompt)
- else:
- logger.debug(f"Using provided options: operationType={options.operationType}, priority={options.priority}")
+ # Create separate operationId for detailed progress tracking
+ import time
+ import uuid
+ workflowId = self.services.currentWorkflow.id if self.services.currentWorkflow else f"no-workflow-{int(time.time())}"
+ aiOperationId = f"ai_documents_{workflowId}_{int(time.time())}"
- # CRITICAL: For document generation with JSON templates, NEVER compress the prompt
- # Compressing would truncate the template structure and confuse the AI
- if outputFormat: # Document generation with structured output
- if not options:
- options = AiCallOptions()
- options.compressPrompt = False # JSON templates must NOT be truncated
- options.compressContext = False # Context also should not be compressed
- logger.debug("Document generation detected - disabled prompt/context compression")
+ # Start progress tracking for this operation
+ self.services.workflow.progressLogStart(
+ aiOperationId,
+ "AI call with documents",
+ "Document Generation",
+ f"Format: {outputFormat or 'text'}"
+ )
- # Handle document generation with specific output format using unified approach
- if outputFormat:
- # Use unified generation method for all document generation
- if documents and len(documents) > 0:
- logger.debug(f"Extracting content from {len(documents)} documents")
- extracted_content = await self.services.ai.documentProcessor.callAiText(prompt, documents, options)
- else:
- logger.debug("No documents provided - using direct generation")
- extracted_content = None
- logger.debug(f"[DEBUG] title value: {title}, type: {type(title)}")
- from modules.services.serviceGeneration.subPromptBuilderGeneration import buildGenerationPrompt
- # First call without continuation context
- generation_prompt = await buildGenerationPrompt(outputFormat, prompt, title, extracted_content, None)
+ try:
+ if options is None or (hasattr(options, 'operationType') and options.operationType is None):
+ # Use AI to determine parameters ONLY when truly needed (options=None OR operationType=None)
+ self.services.workflow.progressLogUpdate(aiOperationId, 0.1, "Analyzing prompt parameters")
+ options = await self._analyzePromptAndCreateOptions(prompt)
- # Prepare prompt builder arguments for continuation
- promptArgs = {
- "outputFormat": outputFormat,
- "userPrompt": prompt,
- "title": title,
- "extracted_content": extracted_content
- }
+ # CRITICAL: For document generation with JSON templates, NEVER compress the prompt
+ # Compressing would truncate the template structure and confuse the AI
+ if outputFormat: # Document generation with structured output
+ if not options:
+ options = AiCallOptions()
+ options.compressPrompt = False # JSON templates must NOT be truncated
+ options.compressContext = False # Context also should not be compressed
- generated_json = await self._callAiWithLooping(
- generation_prompt,
- options,
- "document_generation",
- buildGenerationPrompt,
- promptArgs
- )
-
- # Parse the generated JSON (extract fenced/embedded JSON first)
- try:
- extracted_json = self.services.utils.jsonExtractString(generated_json)
- generated_data = json.loads(extracted_json)
- except json.JSONDecodeError as e:
- logger.error(f"Failed to parse generated JSON: {str(e)}")
- logger.error(f"JSON content length: {len(generated_json)}")
- logger.error(f"JSON content preview (last 200 chars): ...{generated_json[-200:]}")
- logger.error(f"JSON content around error position: {generated_json[max(0, e.pos-50):e.pos+50]}")
+ # Handle document generation with specific output format using unified approach
+ if outputFormat:
+ # Use unified generation method for all document generation
+ if documents and len(documents) > 0:
+ self.services.workflow.progressLogUpdate(aiOperationId, 0.2, f"Extracting content from {len(documents)} documents")
+ extracted_content = await self.services.ai.documentProcessor.callAiText(prompt, documents, options, aiOperationId)
+ else:
+ self.services.workflow.progressLogUpdate(aiOperationId, 0.2, "Preparing for direct generation")
+ extracted_content = None
- # Write the problematic JSON to debug file
- self.services.utils.writeDebugFile(generated_json, "failed_json_parsing")
+ self.services.workflow.progressLogUpdate(aiOperationId, 0.3, "Building generation prompt")
+ from modules.services.serviceGeneration.subPromptBuilderGeneration import buildGenerationPrompt
+ # First call without continuation context
+ generation_prompt = await buildGenerationPrompt(outputFormat, prompt, title, extracted_content, None)
- return {"success": False, "error": f"Generated content is not valid JSON: {str(e)}"}
-
- # Render to final format using the existing renderer
- try:
- from modules.services.serviceGeneration.mainServiceGeneration import GenerationService
- generationService = GenerationService(self.services)
- rendered_content, mime_type = await generationService.renderReport(
- generated_data, outputFormat, title or "Generated Document", prompt, self
- )
-
- # Build result in the expected format
- result = {
- "success": True,
- "content": generated_data,
- "documents": [{
- "documentName": f"generated.{outputFormat}",
- "documentData": rendered_content,
- "mimeType": mime_type,
- "title": title or "Generated Document"
- }],
- "is_multi_file": False,
- "format": outputFormat,
+ # Prepare prompt builder arguments for continuation
+ promptArgs = {
+ "outputFormat": outputFormat,
+ "userPrompt": prompt,
"title": title,
- "split_strategy": "single",
- "total_documents": 1,
- "processed_documents": 1
+ "extracted_content": extracted_content
}
- # Log AI response for debugging
- self.services.utils.writeDebugFile(str(result), "document_generation_response", documents)
- return result
-
- except Exception as e:
- logger.error(f"Error rendering document: {str(e)}")
- return {"success": False, "error": f"Rendering failed: {str(e)}"}
-
- # Handle text calls (no output format specified)
- if documents:
- # Use document processing for text calls with documents
- result = await self.services.ai.documentProcessor.callAiText(prompt, documents, options)
- else:
- # Use shared core function for direct text calls
- result = await self._callAiWithLooping(prompt, options, "text")
-
- return result
+ self.services.workflow.progressLogUpdate(aiOperationId, 0.4, "Calling AI for content generation")
+ generated_json = await self._callAiWithLooping(
+ generation_prompt,
+ options,
+ "document_generation",
+ buildGenerationPrompt,
+ promptArgs,
+ aiOperationId
+ )
+
+ self.services.workflow.progressLogUpdate(aiOperationId, 0.7, "Parsing generated JSON")
+ # Parse the generated JSON (extract fenced/embedded JSON first)
+ try:
+ extracted_json = self.services.utils.jsonExtractString(generated_json)
+ generated_data = json.loads(extracted_json)
+ except json.JSONDecodeError as e:
+ logger.error(f"Failed to parse generated JSON: {str(e)}")
+ logger.error(f"JSON content length: {len(generated_json)}")
+ logger.error(f"JSON content preview (last 200 chars): ...{generated_json[-200:]}")
+ logger.error(f"JSON content around error position: {generated_json[max(0, e.pos-50):e.pos+50]}")
+
+ # Write the problematic JSON to debug file
+ self.services.utils.writeDebugFile(generated_json, "failed_json_parsing")
+
+ self.services.workflow.progressLogFinish(aiOperationId, False)
+ return {"success": False, "error": f"Generated content is not valid JSON: {str(e)}"}
+
+ self.services.workflow.progressLogUpdate(aiOperationId, 0.8, f"Rendering to {outputFormat} format")
+ # Render to final format using the existing renderer
+ try:
+ from modules.services.serviceGeneration.mainServiceGeneration import GenerationService
+ generationService = GenerationService(self.services)
+ rendered_content, mime_type = await generationService.renderReport(
+ generated_data, outputFormat, title or "Generated Document", prompt, self
+ )
+
+ # Build result in the expected format
+ result = {
+ "success": True,
+ "content": generated_data,
+ "documents": [{
+ "documentName": f"generated.{outputFormat}",
+ "documentData": rendered_content,
+ "mimeType": mime_type,
+ "title": title or "Generated Document"
+ }],
+ "is_multi_file": False,
+ "format": outputFormat,
+ "title": title,
+ "split_strategy": "single",
+ "total_documents": 1,
+ "processed_documents": 1
+ }
+
+ # Log AI response for debugging
+ self.services.utils.writeDebugFile(str(result), "document_generation_response", documents)
+
+ self.services.workflow.progressLogFinish(aiOperationId, True)
+ return result
+
+ except Exception as e:
+ logger.error(f"Error rendering document: {str(e)}")
+ self.services.workflow.progressLogFinish(aiOperationId, False)
+ return {"success": False, "error": f"Rendering failed: {str(e)}"}
+
+ # Handle text calls (no output format specified)
+ self.services.workflow.progressLogUpdate(aiOperationId, 0.5, "Processing text call")
+ if documents:
+ # Use document processing for text calls with documents
+ result = await self.services.ai.documentProcessor.callAiText(prompt, documents, options, aiOperationId)
+ else:
+ # Use shared core function for direct text calls
+ result = await self._callAiWithLooping(prompt, options, "text", None, None, aiOperationId)
+
+ self.services.workflow.progressLogFinish(aiOperationId, True)
+ return result
+
+ except Exception as e:
+ logger.error(f"Error in callAiDocuments: {str(e)}")
+ self.services.workflow.progressLogFinish(aiOperationId, False)
+ raise
# AI Image Analysis
@@ -568,12 +596,9 @@ Respond with ONLY a JSON object in this exact format:
# Check if imageData is valid
if not imageData:
error_msg = "No image data provided"
- self.services.utils.debugLogToFile(f"Error in AI image analysis: {error_msg}", "AI_SERVICE")
logger.error(f"Error in AI image analysis: {error_msg}")
return f"Error: {error_msg}"
- self.services.utils.debugLogToFile(f"readImage called with prompt, imageData type: {type(imageData)}, length: {len(imageData) if imageData else 0}, mimeType: {mimeType}", "AI_SERVICE")
- logger.info(f"readImage called with prompt, imageData type: {type(imageData)}, length: {len(imageData) if imageData else 0}, mimeType: {mimeType}")
# Always use IMAGE_ANALYSE operation type for image processing
if options is None:
@@ -612,34 +637,17 @@ Respond with ONLY a JSON object in this exact format:
contentParts=[imagePart]
)
- self.services.utils.debugLogToFile(f"Calling aiObjects.call() with operationType: {options.operationType}", "AI_SERVICE")
- logger.info(f"Calling aiObjects.call() with operationType: {options.operationType}")
-
- # Write image analysis prompt to debug file
- self.services.utils.writeDebugFile(prompt, "image_analysis_prompt")
-
response = await self.aiObjects.call(request)
-
- # Write image analysis response to debug file
- # response is an AiCallResponse object
result = response.content
- self.services.utils.writeDebugFile(result, "image_analysis_response")
-
- # Debug the result
- self.services.utils.debugLogToFile(f"AI image analysis result type: {type(response)}, content length: {len(result)}", "AI_SERVICE")
# Check if result is valid
if not result or (isinstance(result, str) and not result.strip()):
error_msg = f"No response from AI image analysis (result: {repr(result)})"
- self.services.utils.debugLogToFile(f"Error in AI image analysis: {error_msg}", "AI_SERVICE")
logger.error(f"Error in AI image analysis: {error_msg}")
return f"Error: {error_msg}"
- self.services.utils.debugLogToFile(f"callImage returned: {result[:200]}..." if len(result) > 200 else result, "AI_SERVICE")
- logger.info(f"callImage returned: {result[:200]}..." if len(result) > 200 else result)
return result
except Exception as e:
- self.services.utils.debugLogToFile(f"Error in AI image analysis: {str(e)}", "AI_SERVICE")
logger.error(f"Error in AI image analysis: {str(e)}")
return f"Error: {str(e)}"
diff --git a/modules/services/serviceAi/subDocumentProcessing.py b/modules/services/serviceAi/subDocumentProcessing.py
index d6f390ac..a9d01a8a 100644
--- a/modules/services/serviceAi/subDocumentProcessing.py
+++ b/modules/services/serviceAi/subDocumentProcessing.py
@@ -38,7 +38,8 @@ class SubDocumentProcessing:
self,
documents: List[ChatDocument],
prompt: str,
- options: Optional[AiCallOptions] = None
+ options: Optional[AiCallOptions] = None,
+ operationId: Optional[str] = None
) -> str:
"""
Process documents with model-aware chunking and merge results.
@@ -48,6 +49,7 @@ class SubDocumentProcessing:
documents: List of ChatDocument objects to process
prompt: AI prompt for processing
options: AI call options
+ operationId: Optional operation ID for progress tracking
Returns:
Merged AI results as string with preserved document structure
@@ -55,45 +57,69 @@ class SubDocumentProcessing:
if not documents:
return ""
- # Build extraction options using Pydantic model
- mergeStrategy = MergeStrategy(
- useIntelligentMerging=True,
- prompt=prompt,
- groupBy="typeGroup",
- orderBy="id",
- mergeType="concatenate"
- )
-
- extractionOptions = ExtractionOptions(
- prompt=prompt,
- operationType=options.operationType if options else OperationTypeEnum.DATA_EXTRACT,
- processDocumentsIndividually=True,
- mergeStrategy=mergeStrategy
- )
-
- logger.debug(f"Per-chunk extraction options: prompt length={len(extractionOptions.prompt)} chars, operationType={extractionOptions.operationType}")
+ # Create operationId if not provided
+ if not operationId:
+ import time
+ workflowId = self.services.currentWorkflow.id if self.services.currentWorkflow else f"no-workflow-{int(time.time())}"
+ operationId = f"ai_text_extract_{workflowId}_{int(time.time())}"
+ self.services.workflow.progressLogStart(
+ operationId,
+ "AI Text Extract",
+ "Document Processing",
+ f"Processing {len(documents)} documents"
+ )
try:
+ # Build extraction options using Pydantic model
+ mergeStrategy = MergeStrategy(
+ useIntelligentMerging=True,
+ prompt=prompt,
+ groupBy="typeGroup",
+ orderBy="id",
+ mergeType="concatenate"
+ )
+
+ extractionOptions = ExtractionOptions(
+ prompt=prompt,
+ operationType=options.operationType if options else OperationTypeEnum.DATA_EXTRACT,
+ processDocumentsIndividually=True,
+ mergeStrategy=mergeStrategy
+ )
+
+ logger.debug(f"Per-chunk extraction options: prompt length={len(extractionOptions.prompt)} chars, operationType={extractionOptions.operationType}")
+
# Extract content WITHOUT chunking
+ if operationId:
+ self.services.workflow.progressLogUpdate(operationId, 0.1, f"Extracting content from {len(documents)} documents")
extractionResult = self.extractionService.extractContent(documents, extractionOptions)
if not isinstance(extractionResult, list):
+ if operationId:
+ self.services.workflow.progressLogFinish(operationId, False)
return "[Error: No extraction results]"
# Process parts (not chunks) with model-aware AI calls
- partResults = await self._processPartsWithMapping(extractionResult, prompt, options)
+ if operationId:
+ self.services.workflow.progressLogUpdate(operationId, 0.3, f"Processing {len(extractionResult)} extracted content parts")
+ partResults = await self._processPartsWithMapping(extractionResult, prompt, options, operationId)
# Merge results using existing merging system
+ if operationId:
+ self.services.workflow.progressLogUpdate(operationId, 0.9, f"Merging {len(partResults)} part results")
mergedContent = self._mergePartResults(partResults, options)
# Save merged extraction content to debug
self.services.utils.writeDebugFile(mergedContent or '', "extraction_merged_text")
- return mergedContent
+ if operationId:
+ self.services.workflow.progressLogFinish(operationId, True)
+ return mergedContent
except Exception as e:
- logger.error(f"Error in per-chunk processing: {str(e)}")
- return f"[Error in per-chunk processing: {str(e)}]"
+ logger.error(f"Error in processDocumentsPerChunk: {str(e)}")
+ if operationId:
+ self.services.workflow.progressLogFinish(operationId, False)
+ raise
async def processDocumentsPerChunkJson(
self,
@@ -466,19 +492,21 @@ CONTINUATION INSTRUCTIONS:
self,
prompt: str,
documents: Optional[List[ChatDocument]],
- options: AiCallOptions
+ options: AiCallOptions,
+ operationId: Optional[str] = None
) -> str:
"""
Handle text calls with document processing through ExtractionService.
UNIFIED PROCESSING: Always use per-chunk processing for consistency.
"""
- return await self.processDocumentsPerChunk(documents, prompt, options)
+ return await self.processDocumentsPerChunk(documents, prompt, options, operationId)
async def _processPartsWithMapping(
self,
extractionResult: List[ContentExtracted],
prompt: str,
- options: Optional[AiCallOptions] = None
+ options: Optional[AiCallOptions] = None,
+ operationId: Optional[str] = None
) -> List['PartResult']:
"""Process content parts with model-aware chunking and proper mapping."""
from modules.datamodels.datamodelExtraction import PartResult
@@ -505,7 +533,11 @@ CONTINUATION INSTRUCTIONS:
logger.info(f"Processing {len(parts_to_process)} parts with model-aware chunking")
+ total_parts = len(parts_to_process)
+
# Process parts in parallel
+ processed_count = [0] # Use list to allow modification in nested function
+
async def process_single_part(part_info: Dict) -> PartResult:
part = part_info['part']
part_index = part_info['part_index']
@@ -523,6 +555,12 @@ CONTINUATION INSTRUCTIONS:
contentParts=[part] # Pass as list for unified processing
)
+ # Update progress before AI call
+ if operationId and total_parts > 0:
+ processed_count[0] += 1
+ progress = 0.3 + (processed_count[0] / total_parts * 0.6) # Progress from 0.3 to 0.9
+ self.services.workflow.progressLogUpdate(operationId, progress, f"Processing part {processed_count[0]}/{total_parts}")
+
# Call AI with model-aware chunking
response = await self.aiObjects.call(request)
diff --git a/modules/services/serviceGeneration/mainServiceGeneration.py b/modules/services/serviceGeneration/mainServiceGeneration.py
index 321856fb..41bce06d 100644
--- a/modules/services/serviceGeneration/mainServiceGeneration.py
+++ b/modules/services/serviceGeneration/mainServiceGeneration.py
@@ -34,11 +34,8 @@ class GenerationService:
documents = action_result.documents if action_result and hasattr(action_result, 'documents') else []
if not documents:
- logger.info(f"No documents found in action_result.documents for {action.execMethod}.{action.execAction}")
return []
- logger.info(f"Processing {len(documents)} documents from action_result.documents")
-
# Process each document from the AI action result
processed_documents = []
for doc in documents:
@@ -46,7 +43,6 @@ class GenerationService:
if processed_doc:
processed_documents.append(processed_doc)
- logger.info(f"Successfully processed {len(processed_documents)} documents")
return processed_documents
except Exception as e:
logger.error(f"Error processing action result documents: {str(e)}")
@@ -79,11 +75,7 @@ class GenerationService:
Returns a list of created document objects with proper workflow context.
"""
try:
- logger.info(f"Creating documents from action result for {action.execMethod}.{action.execAction}")
- logger.info(f"Action result documents count: {len(action_result.documents) if action_result.documents else 0}")
-
processed_docs = self.processActionResultDocuments(action_result, action, workflow)
- logger.info(f"Processed {len(processed_docs)} documents")
created_documents = []
for i, doc_data in enumerate(processed_docs):
@@ -92,8 +84,6 @@ class GenerationService:
document_data = doc_data['content']
mime_type = doc_data['mimeType']
- logger.info(f"Creating document {i+1}: {document_name} (mime: {mime_type}, content length: {len(str(document_data))})")
-
# Convert document data to string content
content = convertDocumentDataToString(document_data, getFileExtension(document_name))
@@ -103,8 +93,6 @@ class GenerationService:
logger.warning(f"Empty or minimal content for document {document_name}, skipping")
continue
- logger.info(f"Document {document_name} has content: {len(content)} characters")
-
# Normalize file extension based on mime type if missing or incorrect
try:
mime_to_ext = {
@@ -154,14 +142,12 @@ class GenerationService:
# Set workflow context on the document if possible
self._setDocumentWorkflowContext(document, action, workflow)
created_documents.append(document)
- logger.info(f"Successfully created ChatDocument: {document_name} (ID: {document.id if hasattr(document, 'id') else 'N/A'}, fileId: {document.fileId if hasattr(document, 'fileId') else 'N/A'})")
else:
logger.error(f"Failed to create ChatDocument object for {document_name}")
except Exception as e:
logger.error(f"Error creating document {doc_data.get('fileName', 'unknown')}: {str(e)}")
continue
- logger.info(f"Successfully created {len(created_documents)} documents")
return created_documents
except Exception as e:
logger.error(f"Error creating documents from action result: {str(e)}")
@@ -194,8 +180,6 @@ class GenerationService:
if hasattr(document, 'workflowStatus'):
document.workflowStatus = workflow_stats.get('workflowStatus', workflow.status if hasattr(workflow, 'status') else 'unknown')
- logger.debug(f"Set workflow context on document: Round {current_round}, Task {current_task}, Action {current_action}")
- logger.debug(f"Document workflow metadata: ID={document.workflowId if hasattr(document, 'workflowId') else 'N/A'}, Status={document.workflowStatus if hasattr(document, 'workflowStatus') else 'N/A'}")
except Exception as e:
logger.warning(f"Could not set workflow context on document: {str(e)}")
@@ -343,9 +327,7 @@ class GenerationService:
# Render the JSON content directly (AI generation handled by main service)
renderedContent, mimeType = await renderer.render(contentToRender, title, userPrompt, aiService)
- # Remove extra debug output file writes
- logger.info(f"Successfully rendered JSON report to {outputFormat} format: {len(renderedContent)} characters")
return renderedContent, mimeType
except Exception as e:
diff --git a/modules/services/serviceGeneration/renderers/registry.py b/modules/services/serviceGeneration/renderers/registry.py
index 4c9032a4..bb890a82 100644
--- a/modules/services/serviceGeneration/renderers/registry.py
+++ b/modules/services/serviceGeneration/renderers/registry.py
@@ -57,14 +57,12 @@ class RendererRegistry:
# Register the renderer
self._register_renderer_class(attr)
- logger.info(f"Discovered renderer: {attr.__name__} from {module_name}")
except Exception as e:
logger.warning(f"Could not load renderer from {module_name}: {str(e)}")
continue
self._discovered = True
- logger.info(f"Renderer discovery completed. Found {len(self._renderers)} renderers.")
except Exception as e:
logger.error(f"Error during renderer discovery: {str(e)}")
diff --git a/modules/services/serviceGeneration/subPromptBuilderGeneration.py b/modules/services/serviceGeneration/subPromptBuilderGeneration.py
index 895d9af8..ae744664 100644
--- a/modules/services/serviceGeneration/subPromptBuilderGeneration.py
+++ b/modules/services/serviceGeneration/subPromptBuilderGeneration.py
@@ -184,7 +184,7 @@ JSON structure template (reference only - shows the pattern):
Instructions:
- Start your response with {{"metadata": ...}} - return COMPLETE JSON from the beginning
- Do NOT continue from the template examples above - create your own sections
-- Generate content based on the user request
+- Generate complete content based on the user request
- Use the element structures shown in the template (heading, paragraph, list, table, code)
- Create your own section IDs (do not use the example IDs like "section_heading_example")
- When fully complete, add "complete_response": true at root level
diff --git a/modules/services/serviceWorkflow/mainServiceWorkflow.py b/modules/services/serviceWorkflow/mainServiceWorkflow.py
index 4db3ce0f..a38be0c7 100644
--- a/modules/services/serviceWorkflow/mainServiceWorkflow.py
+++ b/modules/services/serviceWorkflow/mainServiceWorkflow.py
@@ -529,7 +529,6 @@ class WorkflowService:
logger.error(f"Failed to store workflow stat: {e}")
raise
-
def updateMessage(self, messageId: str, messageData: Dict[str, Any]):
"""Update message by delegating to the chat interface"""
try:
diff --git a/modules/workflows/methods/methodAi.py b/modules/workflows/methods/methodAi.py
index b656916b..c2250c59 100644
--- a/modules/workflows/methods/methodAi.py
+++ b/modules/workflows/methods/methodAi.py
@@ -46,23 +46,12 @@ class MethodAi(MethodBase):
operationId = f"ai_process_{workflowId}_{int(time.time())}"
# Start progress tracking
- if hasattr(self.services, 'workflow') and self.services.workflow: # TODO: Entfernen für PROD! (block)
- try:
- self.services.workflow.progressLogStart(
+ self.services.workflow.progressLogStart(
operationId,
"Generate",
"AI Processing",
f"Format: {parameters.get('resultType', 'txt')}"
)
- except Exception as e:
- # Silently skip progress tracking errors (e.g., in test environments)
- logger.debug(f"Skipping progress logging: {str(e)}")
-
-
- # Debug logging to see what parameters are received
- logger.info(f"MethodAi.process received parameters: {parameters}")
- logger.info(f"Parameters type: {type(parameters)}")
- logger.info(f"Parameters keys: {list(parameters.keys()) if isinstance(parameters, dict) else 'Not a dict'}")
aiPrompt = parameters.get("aiPrompt")
logger.info(f"aiPrompt extracted: '{aiPrompt}' (type: {type(aiPrompt)})")
@@ -132,27 +121,25 @@ class MethodAi(MethodBase):
mimeType=d.get("mimeType") or output_mime_type
))
- # Complete progress tracking
- self.services.workflow.progressLogFinish(operationId, True)
-
- return ActionResult.isSuccess(documents=action_documents)
-
- extension = output_extension.lstrip('.')
- meaningful_name = self._generateMeaningfulFileName(
- base_name="ai",
- extension=extension,
- action_name="result"
- )
- action_document = ActionDocument(
- documentName=meaningful_name,
- documentData=result,
- mimeType=output_mime_type
- )
+ final_documents = action_documents
+ else:
+ extension = output_extension.lstrip('.')
+ meaningful_name = self._generateMeaningfulFileName(
+ base_name="ai",
+ extension=extension,
+ action_name="result"
+ )
+ action_document = ActionDocument(
+ documentName=meaningful_name,
+ documentData=result,
+ mimeType=output_mime_type
+ )
+ final_documents = [action_document]
# Complete progress tracking
self.services.workflow.progressLogFinish(operationId, True)
- return ActionResult.isSuccess(documents=[action_document])
+ return ActionResult.isSuccess(documents=final_documents)
except Exception as e:
logger.error(f"Error in AI processing: {str(e)}")
diff --git a/test4_method_ai_operations.py b/test4_method_ai_operations.py
index dc09ea9a..4de11ece 100644
--- a/test4_method_ai_operations.py
+++ b/test4_method_ai_operations.py
@@ -50,8 +50,9 @@ class MethodAiOperationsTester:
"resultType": "json"
},
OperationTypeEnum.DATA_GENERATE: {
- "aiPrompt": "Generate the first 4000 prime numbers.",
- "resultType": "txt"
+ # "aiPrompt": "Generate the first 1000 prime numbers.",
+ "aiPrompt": "Schreibe einen Bericht über die Verhaltensweisen in Finnalnd's Saunas in 50 Kapiteln",
+ "resultType": "docx"
},
OperationTypeEnum.DATA_EXTRACT: {
"aiPrompt": "Extract all email addresses and phone numbers from the following text: 'Contact us at support@example.com or call 123-456-7890. For sales, email sales@example.com or call 987-654-3210.'",
@@ -60,7 +61,9 @@ class MethodAiOperationsTester:
OperationTypeEnum.IMAGE_ANALYSE: {
"aiPrompt": "Analyze this image and describe what you see, including any text or numbers visible.",
"resultType": "json",
- "documentList": ["_testdata_photo_2025-06-03_13-05-52.jpg"] if os.path.exists(os.path.join(self.logsDir, "_testdata_photo_2025-06-03_13-05-52.jpg")) else []
+ # documentList should contain document references resolvable by workflow service
+ # For testing, leave empty if no test image is available
+ "documentList": []
},
OperationTypeEnum.IMAGE_GENERATE: {
"aiPrompt": "A beautiful sunset over the ocean with purple and orange hues",
@@ -289,8 +292,8 @@ class MethodAiOperationsTester:
print(f"{'='*80}")
print("Testing DATA_GENERATE operation type...")
- # Test only DATA_GENERATE
- await self.testOperation(OperationTypeEnum.DATA_GENERATE)
+ # Test only ONE operation type TODO
+ await self.testOperation(OperationTypeEnum.IMAGE_ANALYSE)
print(f"\n{'─'*80}")
# Print summary