ui progress tracking integrated

This commit is contained in:
ValueOn AG 2025-10-29 23:40:39 +01:00
parent adbc29f069
commit 26b2109844
8 changed files with 243 additions and 228 deletions

View file

@ -126,7 +126,8 @@ Respond with ONLY a JSON object in this exact format:
options: AiCallOptions, options: AiCallOptions,
debugPrefix: str = "ai_call", debugPrefix: str = "ai_call",
promptBuilder: Optional[callable] = None, promptBuilder: Optional[callable] = None,
promptArgs: Optional[Dict[str, Any]] = None promptArgs: Optional[Dict[str, Any]] = None,
operationId: Optional[str] = None
) -> str: ) -> str:
""" """
Shared core function for AI calls with repair-based looping system. Shared core function for AI calls with repair-based looping system.
@ -136,6 +137,9 @@ Respond with ONLY a JSON object in this exact format:
prompt: The prompt to send to AI prompt: The prompt to send to AI
options: AI call configuration options options: AI call configuration options
debugPrefix: Prefix for debug file names debugPrefix: Prefix for debug file names
promptBuilder: Optional function to rebuild prompts for continuation
promptArgs: Optional arguments for prompt builder
operationId: Optional operation ID for progress tracking
Returns: Returns:
Complete AI response after all iterations Complete AI response after all iterations
@ -145,31 +149,35 @@ Respond with ONLY a JSON object in this exact format:
allSections = [] # Accumulate all sections across iterations allSections = [] # Accumulate all sections across iterations
lastRawResponse = None # Store last raw JSON response for continuation lastRawResponse = None # Store last raw JSON response for continuation
logger.debug(f"Starting AI call with repair-based looping (debug prefix: {debugPrefix})")
while iteration < max_iterations: while iteration < max_iterations:
iteration += 1 iteration += 1
logger.debug(f"AI call iteration {iteration}/{max_iterations}")
# Update progress for iteration start
if operationId:
if iteration == 1:
self.services.workflow.progressLogUpdate(operationId, 0.5, f"Starting AI call iteration {iteration}")
else:
# For continuation iterations, show progress incrementally
base_progress = 0.5 + (min(iteration - 1, max_iterations) / max_iterations * 0.4) # Progress from 0.5 to 0.9 over max_iterations iterations
self.services.workflow.progressLogUpdate(operationId, base_progress, f"Continuing generation (iteration {iteration})")
# Build iteration prompt # Build iteration prompt
if len(allSections) > 0 and promptBuilder and promptArgs: if len(allSections) > 0 and promptBuilder and promptArgs:
# This is a continuation - build continuation context with raw JSON and rebuild prompt # This is a continuation - build continuation context with raw JSON and rebuild prompt
continuationContext = buildContinuationContext(allSections, lastRawResponse) continuationContext = buildContinuationContext(allSections, lastRawResponse)
logger.info(f"Continuation context: {continuationContext.get('section_count')} sections") if not lastRawResponse:
if lastRawResponse:
logger.debug(f"Iteration {iteration}: Including previous response in continuation context ({len(lastRawResponse)} chars)")
else:
logger.warning(f"Iteration {iteration}: No previous response available for continuation!") logger.warning(f"Iteration {iteration}: No previous response available for continuation!")
# Rebuild prompt with continuation context using the provided prompt builder # Rebuild prompt with continuation context using the provided prompt builder
iterationPrompt = await promptBuilder(**promptArgs, continuationContext=continuationContext) iterationPrompt = await promptBuilder(**promptArgs, continuationContext=continuationContext)
logger.debug(f"Rebuilt prompt with continuation context for iteration {iteration}")
else: else:
# First iteration - use original prompt # First iteration - use original prompt
iterationPrompt = prompt iterationPrompt = prompt
# Make AI call # Make AI call
try: try:
if operationId and iteration == 1:
self.services.workflow.progressLogUpdate(operationId, 0.51, "Calling AI model")
from modules.datamodels.datamodelAi import AiCallRequest from modules.datamodels.datamodelAi import AiCallRequest
request = AiCallRequest( request = AiCallRequest(
prompt=iterationPrompt, prompt=iterationPrompt,
@ -186,12 +194,13 @@ Respond with ONLY a JSON object in this exact format:
response = await self.aiObjects.call(request) response = await self.aiObjects.call(request)
result = response.content result = response.content
# Debug: Check response immediately from API # Update progress after AI call
if iteration == 1 and result: if operationId:
first_chars = result[:200].replace('\n', '\\n').replace('\r', '\\r') if iteration == 1:
logger.debug(f"Iteration 1: Raw API response starts with (first 200 chars): '{first_chars}'") self.services.workflow.progressLogUpdate(operationId, 0.6, f"AI response received (iteration {iteration})")
if result.strip().startswith('},') or result.strip().startswith('],'): else:
logger.error(f"Iteration 1: API returned fragment! Full start: '{result[:200]}'") progress = 0.6 + (min(iteration - 1, 10) * 0.03)
self.services.workflow.progressLogUpdate(operationId, progress, f"Processing response (iteration {iteration})")
# Write raw AI response to debug file # Write raw AI response to debug file
if iteration == 1: if iteration == 1:
@ -216,11 +225,16 @@ Respond with ONLY a JSON object in this exact format:
# Check for complete_response flag in raw response (before parsing) # Check for complete_response flag in raw response (before parsing)
import re import re
if re.search(r'"complete_response"\s*:\s*true', result, re.IGNORECASE): if re.search(r'"complete_response"\s*:\s*true', result, re.IGNORECASE):
logger.info(f"Iteration {iteration}: Detected complete_response flag in raw response") pass # Flag detected, will stop in _shouldContinueGeneration
# Extract sections from response (handles both valid and broken JSON) # Extract sections from response (handles both valid and broken JSON)
extractedSections, wasJsonComplete = self._extractSectionsFromResponse(result, iteration, debugPrefix) extractedSections, wasJsonComplete = self._extractSectionsFromResponse(result, iteration, debugPrefix)
# Update progress after parsing
if operationId:
if extractedSections:
self.services.workflow.progressLogUpdate(operationId, 0.65 + (min(iteration - 1, 10) * 0.025), f"Extracted {len(extractedSections)} sections (iteration {iteration})")
if not extractedSections: if not extractedSections:
# If we're in continuation mode and JSON was incomplete, don't stop - continue to allow retry # If we're in continuation mode and JSON was incomplete, don't stop - continue to allow retry
if iteration > 1 and not wasJsonComplete: if iteration > 1 and not wasJsonComplete:
@ -232,15 +246,14 @@ Respond with ONLY a JSON object in this exact format:
# Add new sections to accumulator # Add new sections to accumulator
allSections.extend(extractedSections) allSections.extend(extractedSections)
logger.info(f"Iteration {iteration}: Extracted {len(extractedSections)} sections (total: {len(allSections)})")
# Check if we should continue (completion detection) # Check if we should continue (completion detection)
if self._shouldContinueGeneration(allSections, iteration, wasJsonComplete, result): if self._shouldContinueGeneration(allSections, iteration, wasJsonComplete, result):
logger.debug(f"Iteration {iteration}: Continuing generation")
continue continue
else: else:
# Done - build final result # Done - build final result
logger.info(f"Iteration {iteration}: Generation complete") if operationId:
self.services.workflow.progressLogUpdate(operationId, 0.95, f"Generation complete ({iteration} iterations, {len(allSections)} sections)")
break break
except Exception as e: except Exception as e:
@ -256,7 +269,6 @@ Respond with ONLY a JSON object in this exact format:
# Write final result to debug file # Write final result to debug file
self.services.utils.writeDebugFile(final_result, f"{debugPrefix}_final_result") self.services.utils.writeDebugFile(final_result, f"{debugPrefix}_final_result")
logger.info(f"AI call completed: {len(allSections)} total sections from {iteration} iterations")
return final_result return final_result
def _extractSectionsFromResponse( def _extractSectionsFromResponse(
@ -278,12 +290,9 @@ Respond with ONLY a JSON object in this exact format:
# Check if AI marked response as complete # Check if AI marked response as complete
isComplete = parsed_result.get("complete_response", False) == True isComplete = parsed_result.get("complete_response", False) == True
if isComplete:
logger.info(f"Iteration {iteration}: AI marked response as complete (complete_response: true)")
# Extract sections from parsed JSON # Extract sections from parsed JSON
sections = extractSectionsFromDocument(parsed_result) sections = extractSectionsFromDocument(parsed_result)
logger.debug(f"Iteration {iteration}: Valid JSON - extracted {len(sections)} sections")
# If AI marked as complete, always return as complete # If AI marked as complete, always return as complete
if isComplete: if isComplete:
@ -291,9 +300,7 @@ Respond with ONLY a JSON object in this exact format:
# If in continuation mode (iteration > 1), continuation responses are expected to be fragments # If in continuation mode (iteration > 1), continuation responses are expected to be fragments
# A fragment with 0 extractable sections means JSON is incomplete - need another iteration # A fragment with 0 extractable sections means JSON is incomplete - need another iteration
# Don't use repair mechanism - just mark as incomplete so loop continues
if len(sections) == 0 and iteration > 1: if len(sections) == 0 and iteration > 1:
logger.info(f"Iteration {iteration}: Continuation fragment with 0 extractable sections - JSON incomplete, continuing")
return sections, False # Mark as incomplete so loop continues return sections, False # Mark as incomplete so loop continues
# First iteration with 0 sections means empty response - stop # First iteration with 0 sections means empty response - stop
@ -304,7 +311,6 @@ Respond with ONLY a JSON object in this exact format:
except json.JSONDecodeError as e: except json.JSONDecodeError as e:
# Broken JSON - try repair mechanism (normal in iterative generation) # Broken JSON - try repair mechanism (normal in iterative generation)
logger.info(f"Iteration {iteration}: JSON incomplete/broken, attempting repair: {str(e)}")
self.services.utils.writeDebugFile(result, f"{debugPrefix}_broken_json_iteration_{iteration}") self.services.utils.writeDebugFile(result, f"{debugPrefix}_broken_json_iteration_{iteration}")
# Try to repair # Try to repair
@ -313,7 +319,6 @@ Respond with ONLY a JSON object in this exact format:
if repaired_json: if repaired_json:
# Extract sections from repaired JSON # Extract sections from repaired JSON
sections = extractSectionsFromDocument(repaired_json) sections = extractSectionsFromDocument(repaired_json)
logger.info(f"Iteration {iteration}: Repaired JSON - extracted {len(sections)} sections")
return sections, False # JSON was broken but repaired return sections, False # JSON was broken but repaired
else: else:
# Repair failed - log error # Repair failed - log error
@ -341,18 +346,14 @@ Respond with ONLY a JSON object in this exact format:
# Check for complete_response flag in raw response # Check for complete_response flag in raw response
if rawResponse: if rawResponse:
import re import re
# Look for complete_response: true pattern (allowing for whitespace variations)
if re.search(r'"complete_response"\s*:\s*true', rawResponse, re.IGNORECASE): if re.search(r'"complete_response"\s*:\s*true', rawResponse, re.IGNORECASE):
logger.info("AI marked response as complete (complete_response: true) - stopping generation")
return False return False
# If JSON was complete (and no complete_response flag), we're done # If JSON was complete (and no complete_response flag), we're done
# If JSON was broken and repaired, continue to get more content # If JSON was broken and repaired, continue to get more content
if wasJsonComplete: if wasJsonComplete:
logger.info("JSON was complete - stopping generation")
return False return False
else: else:
logger.info("JSON was broken/repaired - continuing generation")
return True return True
def _buildFinalResultFromSections( def _buildFinalResultFromSections(
@ -407,7 +408,6 @@ Respond with ONLY a JSON object in this exact format:
Planning JSON response Planning JSON response
""" """
# Planning calls always use static parameters # Planning calls always use static parameters
logger.debug("Using static parameters for planning call")
options = AiCallOptions( options = AiCallOptions(
operationType=OperationTypeEnum.PLAN, operationType=OperationTypeEnum.PLAN,
priority=PriorityEnum.QUALITY, priority=PriorityEnum.QUALITY,
@ -449,110 +449,138 @@ Respond with ONLY a JSON object in this exact format:
Returns: Returns:
AI response as string, or dict with documents if outputFormat is specified AI response as string, or dict with documents if outputFormat is specified
""" """
if options is None or (hasattr(options, 'operationType') and options.operationType is None): # Create separate operationId for detailed progress tracking
# Use AI to determine parameters ONLY when truly needed (options=None OR operationType=None) import time
logger.debug("Analyzing prompt to determine optimal parameters") import uuid
options = await self._analyzePromptAndCreateOptions(prompt) workflowId = self.services.currentWorkflow.id if self.services.currentWorkflow else f"no-workflow-{int(time.time())}"
else: aiOperationId = f"ai_documents_{workflowId}_{int(time.time())}"
logger.debug(f"Using provided options: operationType={options.operationType}, priority={options.priority}")
# CRITICAL: For document generation with JSON templates, NEVER compress the prompt # Start progress tracking for this operation
# Compressing would truncate the template structure and confuse the AI self.services.workflow.progressLogStart(
if outputFormat: # Document generation with structured output aiOperationId,
if not options: "AI call with documents",
options = AiCallOptions() "Document Generation",
options.compressPrompt = False # JSON templates must NOT be truncated f"Format: {outputFormat or 'text'}"
options.compressContext = False # Context also should not be compressed )
logger.debug("Document generation detected - disabled prompt/context compression")
# Handle document generation with specific output format using unified approach try:
if outputFormat: if options is None or (hasattr(options, 'operationType') and options.operationType is None):
# Use unified generation method for all document generation # Use AI to determine parameters ONLY when truly needed (options=None OR operationType=None)
if documents and len(documents) > 0: self.services.workflow.progressLogUpdate(aiOperationId, 0.1, "Analyzing prompt parameters")
logger.debug(f"Extracting content from {len(documents)} documents") options = await self._analyzePromptAndCreateOptions(prompt)
extracted_content = await self.services.ai.documentProcessor.callAiText(prompt, documents, options)
else:
logger.debug("No documents provided - using direct generation")
extracted_content = None
logger.debug(f"[DEBUG] title value: {title}, type: {type(title)}")
from modules.services.serviceGeneration.subPromptBuilderGeneration import buildGenerationPrompt
# First call without continuation context
generation_prompt = await buildGenerationPrompt(outputFormat, prompt, title, extracted_content, None)
# Prepare prompt builder arguments for continuation # CRITICAL: For document generation with JSON templates, NEVER compress the prompt
promptArgs = { # Compressing would truncate the template structure and confuse the AI
"outputFormat": outputFormat, if outputFormat: # Document generation with structured output
"userPrompt": prompt, if not options:
"title": title, options = AiCallOptions()
"extracted_content": extracted_content options.compressPrompt = False # JSON templates must NOT be truncated
} options.compressContext = False # Context also should not be compressed
generated_json = await self._callAiWithLooping( # Handle document generation with specific output format using unified approach
generation_prompt, if outputFormat:
options, # Use unified generation method for all document generation
"document_generation", if documents and len(documents) > 0:
buildGenerationPrompt, self.services.workflow.progressLogUpdate(aiOperationId, 0.2, f"Extracting content from {len(documents)} documents")
promptArgs extracted_content = await self.services.ai.documentProcessor.callAiText(prompt, documents, options, aiOperationId)
) else:
self.services.workflow.progressLogUpdate(aiOperationId, 0.2, "Preparing for direct generation")
# Parse the generated JSON (extract fenced/embedded JSON first) extracted_content = None
try:
extracted_json = self.services.utils.jsonExtractString(generated_json)
generated_data = json.loads(extracted_json)
except json.JSONDecodeError as e:
logger.error(f"Failed to parse generated JSON: {str(e)}")
logger.error(f"JSON content length: {len(generated_json)}")
logger.error(f"JSON content preview (last 200 chars): ...{generated_json[-200:]}")
logger.error(f"JSON content around error position: {generated_json[max(0, e.pos-50):e.pos+50]}")
# Write the problematic JSON to debug file self.services.workflow.progressLogUpdate(aiOperationId, 0.3, "Building generation prompt")
self.services.utils.writeDebugFile(generated_json, "failed_json_parsing") from modules.services.serviceGeneration.subPromptBuilderGeneration import buildGenerationPrompt
# First call without continuation context
generation_prompt = await buildGenerationPrompt(outputFormat, prompt, title, extracted_content, None)
return {"success": False, "error": f"Generated content is not valid JSON: {str(e)}"} # Prepare prompt builder arguments for continuation
promptArgs = {
# Render to final format using the existing renderer "outputFormat": outputFormat,
try: "userPrompt": prompt,
from modules.services.serviceGeneration.mainServiceGeneration import GenerationService
generationService = GenerationService(self.services)
rendered_content, mime_type = await generationService.renderReport(
generated_data, outputFormat, title or "Generated Document", prompt, self
)
# Build result in the expected format
result = {
"success": True,
"content": generated_data,
"documents": [{
"documentName": f"generated.{outputFormat}",
"documentData": rendered_content,
"mimeType": mime_type,
"title": title or "Generated Document"
}],
"is_multi_file": False,
"format": outputFormat,
"title": title, "title": title,
"split_strategy": "single", "extracted_content": extracted_content
"total_documents": 1,
"processed_documents": 1
} }
# Log AI response for debugging self.services.workflow.progressLogUpdate(aiOperationId, 0.4, "Calling AI for content generation")
self.services.utils.writeDebugFile(str(result), "document_generation_response", documents) generated_json = await self._callAiWithLooping(
return result generation_prompt,
options,
except Exception as e: "document_generation",
logger.error(f"Error rendering document: {str(e)}") buildGenerationPrompt,
return {"success": False, "error": f"Rendering failed: {str(e)}"} promptArgs,
aiOperationId
# Handle text calls (no output format specified) )
if documents:
# Use document processing for text calls with documents self.services.workflow.progressLogUpdate(aiOperationId, 0.7, "Parsing generated JSON")
result = await self.services.ai.documentProcessor.callAiText(prompt, documents, options) # Parse the generated JSON (extract fenced/embedded JSON first)
else: try:
# Use shared core function for direct text calls extracted_json = self.services.utils.jsonExtractString(generated_json)
result = await self._callAiWithLooping(prompt, options, "text") generated_data = json.loads(extracted_json)
except json.JSONDecodeError as e:
return result logger.error(f"Failed to parse generated JSON: {str(e)}")
logger.error(f"JSON content length: {len(generated_json)}")
logger.error(f"JSON content preview (last 200 chars): ...{generated_json[-200:]}")
logger.error(f"JSON content around error position: {generated_json[max(0, e.pos-50):e.pos+50]}")
# Write the problematic JSON to debug file
self.services.utils.writeDebugFile(generated_json, "failed_json_parsing")
self.services.workflow.progressLogFinish(aiOperationId, False)
return {"success": False, "error": f"Generated content is not valid JSON: {str(e)}"}
self.services.workflow.progressLogUpdate(aiOperationId, 0.8, f"Rendering to {outputFormat} format")
# Render to final format using the existing renderer
try:
from modules.services.serviceGeneration.mainServiceGeneration import GenerationService
generationService = GenerationService(self.services)
rendered_content, mime_type = await generationService.renderReport(
generated_data, outputFormat, title or "Generated Document", prompt, self
)
# Build result in the expected format
result = {
"success": True,
"content": generated_data,
"documents": [{
"documentName": f"generated.{outputFormat}",
"documentData": rendered_content,
"mimeType": mime_type,
"title": title or "Generated Document"
}],
"is_multi_file": False,
"format": outputFormat,
"title": title,
"split_strategy": "single",
"total_documents": 1,
"processed_documents": 1
}
# Log AI response for debugging
self.services.utils.writeDebugFile(str(result), "document_generation_response", documents)
self.services.workflow.progressLogFinish(aiOperationId, True)
return result
except Exception as e:
logger.error(f"Error rendering document: {str(e)}")
self.services.workflow.progressLogFinish(aiOperationId, False)
return {"success": False, "error": f"Rendering failed: {str(e)}"}
# Handle text calls (no output format specified)
self.services.workflow.progressLogUpdate(aiOperationId, 0.5, "Processing text call")
if documents:
# Use document processing for text calls with documents
result = await self.services.ai.documentProcessor.callAiText(prompt, documents, options, aiOperationId)
else:
# Use shared core function for direct text calls
result = await self._callAiWithLooping(prompt, options, "text", None, None, aiOperationId)
self.services.workflow.progressLogFinish(aiOperationId, True)
return result
except Exception as e:
logger.error(f"Error in callAiDocuments: {str(e)}")
self.services.workflow.progressLogFinish(aiOperationId, False)
raise
# AI Image Analysis # AI Image Analysis
@ -568,12 +596,9 @@ Respond with ONLY a JSON object in this exact format:
# Check if imageData is valid # Check if imageData is valid
if not imageData: if not imageData:
error_msg = "No image data provided" error_msg = "No image data provided"
self.services.utils.debugLogToFile(f"Error in AI image analysis: {error_msg}", "AI_SERVICE")
logger.error(f"Error in AI image analysis: {error_msg}") logger.error(f"Error in AI image analysis: {error_msg}")
return f"Error: {error_msg}" return f"Error: {error_msg}"
self.services.utils.debugLogToFile(f"readImage called with prompt, imageData type: {type(imageData)}, length: {len(imageData) if imageData else 0}, mimeType: {mimeType}", "AI_SERVICE")
logger.info(f"readImage called with prompt, imageData type: {type(imageData)}, length: {len(imageData) if imageData else 0}, mimeType: {mimeType}")
# Always use IMAGE_ANALYSE operation type for image processing # Always use IMAGE_ANALYSE operation type for image processing
if options is None: if options is None:
@ -612,34 +637,17 @@ Respond with ONLY a JSON object in this exact format:
contentParts=[imagePart] contentParts=[imagePart]
) )
self.services.utils.debugLogToFile(f"Calling aiObjects.call() with operationType: {options.operationType}", "AI_SERVICE")
logger.info(f"Calling aiObjects.call() with operationType: {options.operationType}")
# Write image analysis prompt to debug file
self.services.utils.writeDebugFile(prompt, "image_analysis_prompt")
response = await self.aiObjects.call(request) response = await self.aiObjects.call(request)
# Write image analysis response to debug file
# response is an AiCallResponse object
result = response.content result = response.content
self.services.utils.writeDebugFile(result, "image_analysis_response")
# Debug the result
self.services.utils.debugLogToFile(f"AI image analysis result type: {type(response)}, content length: {len(result)}", "AI_SERVICE")
# Check if result is valid # Check if result is valid
if not result or (isinstance(result, str) and not result.strip()): if not result or (isinstance(result, str) and not result.strip()):
error_msg = f"No response from AI image analysis (result: {repr(result)})" error_msg = f"No response from AI image analysis (result: {repr(result)})"
self.services.utils.debugLogToFile(f"Error in AI image analysis: {error_msg}", "AI_SERVICE")
logger.error(f"Error in AI image analysis: {error_msg}") logger.error(f"Error in AI image analysis: {error_msg}")
return f"Error: {error_msg}" return f"Error: {error_msg}"
self.services.utils.debugLogToFile(f"callImage returned: {result[:200]}..." if len(result) > 200 else result, "AI_SERVICE")
logger.info(f"callImage returned: {result[:200]}..." if len(result) > 200 else result)
return result return result
except Exception as e: except Exception as e:
self.services.utils.debugLogToFile(f"Error in AI image analysis: {str(e)}", "AI_SERVICE")
logger.error(f"Error in AI image analysis: {str(e)}") logger.error(f"Error in AI image analysis: {str(e)}")
return f"Error: {str(e)}" return f"Error: {str(e)}"

View file

@ -38,7 +38,8 @@ class SubDocumentProcessing:
self, self,
documents: List[ChatDocument], documents: List[ChatDocument],
prompt: str, prompt: str,
options: Optional[AiCallOptions] = None options: Optional[AiCallOptions] = None,
operationId: Optional[str] = None
) -> str: ) -> str:
""" """
Process documents with model-aware chunking and merge results. Process documents with model-aware chunking and merge results.
@ -48,6 +49,7 @@ class SubDocumentProcessing:
documents: List of ChatDocument objects to process documents: List of ChatDocument objects to process
prompt: AI prompt for processing prompt: AI prompt for processing
options: AI call options options: AI call options
operationId: Optional operation ID for progress tracking
Returns: Returns:
Merged AI results as string with preserved document structure Merged AI results as string with preserved document structure
@ -55,45 +57,69 @@ class SubDocumentProcessing:
if not documents: if not documents:
return "" return ""
# Build extraction options using Pydantic model # Create operationId if not provided
mergeStrategy = MergeStrategy( if not operationId:
useIntelligentMerging=True, import time
prompt=prompt, workflowId = self.services.currentWorkflow.id if self.services.currentWorkflow else f"no-workflow-{int(time.time())}"
groupBy="typeGroup", operationId = f"ai_text_extract_{workflowId}_{int(time.time())}"
orderBy="id", self.services.workflow.progressLogStart(
mergeType="concatenate" operationId,
) "AI Text Extract",
"Document Processing",
extractionOptions = ExtractionOptions( f"Processing {len(documents)} documents"
prompt=prompt, )
operationType=options.operationType if options else OperationTypeEnum.DATA_EXTRACT,
processDocumentsIndividually=True,
mergeStrategy=mergeStrategy
)
logger.debug(f"Per-chunk extraction options: prompt length={len(extractionOptions.prompt)} chars, operationType={extractionOptions.operationType}")
try: try:
# Build extraction options using Pydantic model
mergeStrategy = MergeStrategy(
useIntelligentMerging=True,
prompt=prompt,
groupBy="typeGroup",
orderBy="id",
mergeType="concatenate"
)
extractionOptions = ExtractionOptions(
prompt=prompt,
operationType=options.operationType if options else OperationTypeEnum.DATA_EXTRACT,
processDocumentsIndividually=True,
mergeStrategy=mergeStrategy
)
logger.debug(f"Per-chunk extraction options: prompt length={len(extractionOptions.prompt)} chars, operationType={extractionOptions.operationType}")
# Extract content WITHOUT chunking # Extract content WITHOUT chunking
if operationId:
self.services.workflow.progressLogUpdate(operationId, 0.1, f"Extracting content from {len(documents)} documents")
extractionResult = self.extractionService.extractContent(documents, extractionOptions) extractionResult = self.extractionService.extractContent(documents, extractionOptions)
if not isinstance(extractionResult, list): if not isinstance(extractionResult, list):
if operationId:
self.services.workflow.progressLogFinish(operationId, False)
return "[Error: No extraction results]" return "[Error: No extraction results]"
# Process parts (not chunks) with model-aware AI calls # Process parts (not chunks) with model-aware AI calls
partResults = await self._processPartsWithMapping(extractionResult, prompt, options) if operationId:
self.services.workflow.progressLogUpdate(operationId, 0.3, f"Processing {len(extractionResult)} extracted content parts")
partResults = await self._processPartsWithMapping(extractionResult, prompt, options, operationId)
# Merge results using existing merging system # Merge results using existing merging system
if operationId:
self.services.workflow.progressLogUpdate(operationId, 0.9, f"Merging {len(partResults)} part results")
mergedContent = self._mergePartResults(partResults, options) mergedContent = self._mergePartResults(partResults, options)
# Save merged extraction content to debug # Save merged extraction content to debug
self.services.utils.writeDebugFile(mergedContent or '', "extraction_merged_text") self.services.utils.writeDebugFile(mergedContent or '', "extraction_merged_text")
return mergedContent if operationId:
self.services.workflow.progressLogFinish(operationId, True)
return mergedContent
except Exception as e: except Exception as e:
logger.error(f"Error in per-chunk processing: {str(e)}") logger.error(f"Error in processDocumentsPerChunk: {str(e)}")
return f"[Error in per-chunk processing: {str(e)}]" if operationId:
self.services.workflow.progressLogFinish(operationId, False)
raise
async def processDocumentsPerChunkJson( async def processDocumentsPerChunkJson(
self, self,
@ -466,19 +492,21 @@ CONTINUATION INSTRUCTIONS:
self, self,
prompt: str, prompt: str,
documents: Optional[List[ChatDocument]], documents: Optional[List[ChatDocument]],
options: AiCallOptions options: AiCallOptions,
operationId: Optional[str] = None
) -> str: ) -> str:
""" """
Handle text calls with document processing through ExtractionService. Handle text calls with document processing through ExtractionService.
UNIFIED PROCESSING: Always use per-chunk processing for consistency. UNIFIED PROCESSING: Always use per-chunk processing for consistency.
""" """
return await self.processDocumentsPerChunk(documents, prompt, options) return await self.processDocumentsPerChunk(documents, prompt, options, operationId)
async def _processPartsWithMapping( async def _processPartsWithMapping(
self, self,
extractionResult: List[ContentExtracted], extractionResult: List[ContentExtracted],
prompt: str, prompt: str,
options: Optional[AiCallOptions] = None options: Optional[AiCallOptions] = None,
operationId: Optional[str] = None
) -> List['PartResult']: ) -> List['PartResult']:
"""Process content parts with model-aware chunking and proper mapping.""" """Process content parts with model-aware chunking and proper mapping."""
from modules.datamodels.datamodelExtraction import PartResult from modules.datamodels.datamodelExtraction import PartResult
@ -505,7 +533,11 @@ CONTINUATION INSTRUCTIONS:
logger.info(f"Processing {len(parts_to_process)} parts with model-aware chunking") logger.info(f"Processing {len(parts_to_process)} parts with model-aware chunking")
total_parts = len(parts_to_process)
# Process parts in parallel # Process parts in parallel
processed_count = [0] # Use list to allow modification in nested function
async def process_single_part(part_info: Dict) -> PartResult: async def process_single_part(part_info: Dict) -> PartResult:
part = part_info['part'] part = part_info['part']
part_index = part_info['part_index'] part_index = part_info['part_index']
@ -523,6 +555,12 @@ CONTINUATION INSTRUCTIONS:
contentParts=[part] # Pass as list for unified processing contentParts=[part] # Pass as list for unified processing
) )
# Update progress before AI call
if operationId and total_parts > 0:
processed_count[0] += 1
progress = 0.3 + (processed_count[0] / total_parts * 0.6) # Progress from 0.3 to 0.9
self.services.workflow.progressLogUpdate(operationId, progress, f"Processing part {processed_count[0]}/{total_parts}")
# Call AI with model-aware chunking # Call AI with model-aware chunking
response = await self.aiObjects.call(request) response = await self.aiObjects.call(request)

View file

@ -34,11 +34,8 @@ class GenerationService:
documents = action_result.documents if action_result and hasattr(action_result, 'documents') else [] documents = action_result.documents if action_result and hasattr(action_result, 'documents') else []
if not documents: if not documents:
logger.info(f"No documents found in action_result.documents for {action.execMethod}.{action.execAction}")
return [] return []
logger.info(f"Processing {len(documents)} documents from action_result.documents")
# Process each document from the AI action result # Process each document from the AI action result
processed_documents = [] processed_documents = []
for doc in documents: for doc in documents:
@ -46,7 +43,6 @@ class GenerationService:
if processed_doc: if processed_doc:
processed_documents.append(processed_doc) processed_documents.append(processed_doc)
logger.info(f"Successfully processed {len(processed_documents)} documents")
return processed_documents return processed_documents
except Exception as e: except Exception as e:
logger.error(f"Error processing action result documents: {str(e)}") logger.error(f"Error processing action result documents: {str(e)}")
@ -79,11 +75,7 @@ class GenerationService:
Returns a list of created document objects with proper workflow context. Returns a list of created document objects with proper workflow context.
""" """
try: try:
logger.info(f"Creating documents from action result for {action.execMethod}.{action.execAction}")
logger.info(f"Action result documents count: {len(action_result.documents) if action_result.documents else 0}")
processed_docs = self.processActionResultDocuments(action_result, action, workflow) processed_docs = self.processActionResultDocuments(action_result, action, workflow)
logger.info(f"Processed {len(processed_docs)} documents")
created_documents = [] created_documents = []
for i, doc_data in enumerate(processed_docs): for i, doc_data in enumerate(processed_docs):
@ -92,8 +84,6 @@ class GenerationService:
document_data = doc_data['content'] document_data = doc_data['content']
mime_type = doc_data['mimeType'] mime_type = doc_data['mimeType']
logger.info(f"Creating document {i+1}: {document_name} (mime: {mime_type}, content length: {len(str(document_data))})")
# Convert document data to string content # Convert document data to string content
content = convertDocumentDataToString(document_data, getFileExtension(document_name)) content = convertDocumentDataToString(document_data, getFileExtension(document_name))
@ -103,8 +93,6 @@ class GenerationService:
logger.warning(f"Empty or minimal content for document {document_name}, skipping") logger.warning(f"Empty or minimal content for document {document_name}, skipping")
continue continue
logger.info(f"Document {document_name} has content: {len(content)} characters")
# Normalize file extension based on mime type if missing or incorrect # Normalize file extension based on mime type if missing or incorrect
try: try:
mime_to_ext = { mime_to_ext = {
@ -154,14 +142,12 @@ class GenerationService:
# Set workflow context on the document if possible # Set workflow context on the document if possible
self._setDocumentWorkflowContext(document, action, workflow) self._setDocumentWorkflowContext(document, action, workflow)
created_documents.append(document) created_documents.append(document)
logger.info(f"Successfully created ChatDocument: {document_name} (ID: {document.id if hasattr(document, 'id') else 'N/A'}, fileId: {document.fileId if hasattr(document, 'fileId') else 'N/A'})")
else: else:
logger.error(f"Failed to create ChatDocument object for {document_name}") logger.error(f"Failed to create ChatDocument object for {document_name}")
except Exception as e: except Exception as e:
logger.error(f"Error creating document {doc_data.get('fileName', 'unknown')}: {str(e)}") logger.error(f"Error creating document {doc_data.get('fileName', 'unknown')}: {str(e)}")
continue continue
logger.info(f"Successfully created {len(created_documents)} documents")
return created_documents return created_documents
except Exception as e: except Exception as e:
logger.error(f"Error creating documents from action result: {str(e)}") logger.error(f"Error creating documents from action result: {str(e)}")
@ -194,8 +180,6 @@ class GenerationService:
if hasattr(document, 'workflowStatus'): if hasattr(document, 'workflowStatus'):
document.workflowStatus = workflow_stats.get('workflowStatus', workflow.status if hasattr(workflow, 'status') else 'unknown') document.workflowStatus = workflow_stats.get('workflowStatus', workflow.status if hasattr(workflow, 'status') else 'unknown')
logger.debug(f"Set workflow context on document: Round {current_round}, Task {current_task}, Action {current_action}")
logger.debug(f"Document workflow metadata: ID={document.workflowId if hasattr(document, 'workflowId') else 'N/A'}, Status={document.workflowStatus if hasattr(document, 'workflowStatus') else 'N/A'}")
except Exception as e: except Exception as e:
logger.warning(f"Could not set workflow context on document: {str(e)}") logger.warning(f"Could not set workflow context on document: {str(e)}")
@ -343,9 +327,7 @@ class GenerationService:
# Render the JSON content directly (AI generation handled by main service) # Render the JSON content directly (AI generation handled by main service)
renderedContent, mimeType = await renderer.render(contentToRender, title, userPrompt, aiService) renderedContent, mimeType = await renderer.render(contentToRender, title, userPrompt, aiService)
# Remove extra debug output file writes
logger.info(f"Successfully rendered JSON report to {outputFormat} format: {len(renderedContent)} characters")
return renderedContent, mimeType return renderedContent, mimeType
except Exception as e: except Exception as e:

View file

@ -57,14 +57,12 @@ class RendererRegistry:
# Register the renderer # Register the renderer
self._register_renderer_class(attr) self._register_renderer_class(attr)
logger.info(f"Discovered renderer: {attr.__name__} from {module_name}")
except Exception as e: except Exception as e:
logger.warning(f"Could not load renderer from {module_name}: {str(e)}") logger.warning(f"Could not load renderer from {module_name}: {str(e)}")
continue continue
self._discovered = True self._discovered = True
logger.info(f"Renderer discovery completed. Found {len(self._renderers)} renderers.")
except Exception as e: except Exception as e:
logger.error(f"Error during renderer discovery: {str(e)}") logger.error(f"Error during renderer discovery: {str(e)}")

View file

@ -184,7 +184,7 @@ JSON structure template (reference only - shows the pattern):
Instructions: Instructions:
- Start your response with {{"metadata": ...}} - return COMPLETE JSON from the beginning - Start your response with {{"metadata": ...}} - return COMPLETE JSON from the beginning
- Do NOT continue from the template examples above - create your own sections - Do NOT continue from the template examples above - create your own sections
- Generate content based on the user request - Generate complete content based on the user request
- Use the element structures shown in the template (heading, paragraph, list, table, code) - Use the element structures shown in the template (heading, paragraph, list, table, code)
- Create your own section IDs (do not use the example IDs like "section_heading_example") - Create your own section IDs (do not use the example IDs like "section_heading_example")
- When fully complete, add "complete_response": true at root level - When fully complete, add "complete_response": true at root level

View file

@ -529,7 +529,6 @@ class WorkflowService:
logger.error(f"Failed to store workflow stat: {e}") logger.error(f"Failed to store workflow stat: {e}")
raise raise
def updateMessage(self, messageId: str, messageData: Dict[str, Any]): def updateMessage(self, messageId: str, messageData: Dict[str, Any]):
"""Update message by delegating to the chat interface""" """Update message by delegating to the chat interface"""
try: try:

View file

@ -46,23 +46,12 @@ class MethodAi(MethodBase):
operationId = f"ai_process_{workflowId}_{int(time.time())}" operationId = f"ai_process_{workflowId}_{int(time.time())}"
# Start progress tracking # Start progress tracking
if hasattr(self.services, 'workflow') and self.services.workflow: # TODO: Entfernen für PROD! (block) self.services.workflow.progressLogStart(
try:
self.services.workflow.progressLogStart(
operationId, operationId,
"Generate", "Generate",
"AI Processing", "AI Processing",
f"Format: {parameters.get('resultType', 'txt')}" f"Format: {parameters.get('resultType', 'txt')}"
) )
except Exception as e:
# Silently skip progress tracking errors (e.g., in test environments)
logger.debug(f"Skipping progress logging: {str(e)}")
# Debug logging to see what parameters are received
logger.info(f"MethodAi.process received parameters: {parameters}")
logger.info(f"Parameters type: {type(parameters)}")
logger.info(f"Parameters keys: {list(parameters.keys()) if isinstance(parameters, dict) else 'Not a dict'}")
aiPrompt = parameters.get("aiPrompt") aiPrompt = parameters.get("aiPrompt")
logger.info(f"aiPrompt extracted: '{aiPrompt}' (type: {type(aiPrompt)})") logger.info(f"aiPrompt extracted: '{aiPrompt}' (type: {type(aiPrompt)})")
@ -132,27 +121,25 @@ class MethodAi(MethodBase):
mimeType=d.get("mimeType") or output_mime_type mimeType=d.get("mimeType") or output_mime_type
)) ))
# Complete progress tracking final_documents = action_documents
self.services.workflow.progressLogFinish(operationId, True) else:
extension = output_extension.lstrip('.')
return ActionResult.isSuccess(documents=action_documents) meaningful_name = self._generateMeaningfulFileName(
base_name="ai",
extension = output_extension.lstrip('.') extension=extension,
meaningful_name = self._generateMeaningfulFileName( action_name="result"
base_name="ai", )
extension=extension, action_document = ActionDocument(
action_name="result" documentName=meaningful_name,
) documentData=result,
action_document = ActionDocument( mimeType=output_mime_type
documentName=meaningful_name, )
documentData=result, final_documents = [action_document]
mimeType=output_mime_type
)
# Complete progress tracking # Complete progress tracking
self.services.workflow.progressLogFinish(operationId, True) self.services.workflow.progressLogFinish(operationId, True)
return ActionResult.isSuccess(documents=[action_document]) return ActionResult.isSuccess(documents=final_documents)
except Exception as e: except Exception as e:
logger.error(f"Error in AI processing: {str(e)}") logger.error(f"Error in AI processing: {str(e)}")

View file

@ -50,8 +50,9 @@ class MethodAiOperationsTester:
"resultType": "json" "resultType": "json"
}, },
OperationTypeEnum.DATA_GENERATE: { OperationTypeEnum.DATA_GENERATE: {
"aiPrompt": "Generate the first 4000 prime numbers.", # "aiPrompt": "Generate the first 1000 prime numbers.",
"resultType": "txt" "aiPrompt": "Schreibe einen Bericht über die Verhaltensweisen in Finnalnd's Saunas in 50 Kapiteln",
"resultType": "docx"
}, },
OperationTypeEnum.DATA_EXTRACT: { OperationTypeEnum.DATA_EXTRACT: {
"aiPrompt": "Extract all email addresses and phone numbers from the following text: 'Contact us at support@example.com or call 123-456-7890. For sales, email sales@example.com or call 987-654-3210.'", "aiPrompt": "Extract all email addresses and phone numbers from the following text: 'Contact us at support@example.com or call 123-456-7890. For sales, email sales@example.com or call 987-654-3210.'",
@ -60,7 +61,9 @@ class MethodAiOperationsTester:
OperationTypeEnum.IMAGE_ANALYSE: { OperationTypeEnum.IMAGE_ANALYSE: {
"aiPrompt": "Analyze this image and describe what you see, including any text or numbers visible.", "aiPrompt": "Analyze this image and describe what you see, including any text or numbers visible.",
"resultType": "json", "resultType": "json",
"documentList": ["_testdata_photo_2025-06-03_13-05-52.jpg"] if os.path.exists(os.path.join(self.logsDir, "_testdata_photo_2025-06-03_13-05-52.jpg")) else [] # documentList should contain document references resolvable by workflow service
# For testing, leave empty if no test image is available
"documentList": []
}, },
OperationTypeEnum.IMAGE_GENERATE: { OperationTypeEnum.IMAGE_GENERATE: {
"aiPrompt": "A beautiful sunset over the ocean with purple and orange hues", "aiPrompt": "A beautiful sunset over the ocean with purple and orange hues",
@ -289,8 +292,8 @@ class MethodAiOperationsTester:
print(f"{'='*80}") print(f"{'='*80}")
print("Testing DATA_GENERATE operation type...") print("Testing DATA_GENERATE operation type...")
# Test only DATA_GENERATE # Test only ONE operation type TODO
await self.testOperation(OperationTypeEnum.DATA_GENERATE) await self.testOperation(OperationTypeEnum.IMAGE_ANALYSE)
print(f"\n{''*80}") print(f"\n{''*80}")
# Print summary # Print summary