From 64b44473aa3479cf779669b1764e730e39fc8a24 Mon Sep 17 00:00:00 2001
From: ValueOn AG
Date: Fri, 2 Jan 2026 00:05:54 +0100
Subject: [PATCH] fixed data extraction and generation handling with parts
---
modules/aicore/aicorePluginPerplexity.py | 10 +-
modules/aicore/aicorePluginTavily.py | 8 +-
modules/datamodels/datamodelAi.py | 6 +-
.../datamodels/datamodelWorkflowActions.py | 2 +
modules/services/serviceAi/mainServiceAi.py | 4 +-
.../services/serviceAi/subAiCallLooping.py | 46 +-
.../serviceAi/subContentExtraction.py | 206 +-----
.../services/serviceAi/subStructureFilling.py | 439 +++++++++++-
.../serviceAi/subStructureGeneration.py | 74 +-
.../mainServiceExtraction.py | 674 +++++++++++-------
.../subPromptBuilderExtraction.py | 112 ++-
.../paths/ARCHITECTURE_ANALYSIS.md | 114 +++
.../paths/ARCHITECTURE_CHANGES.md | 77 ++
.../serviceGeneration/paths/documentPath.py | 101 +--
.../renderers/rendererXlsx.py | 89 ++-
modules/services/serviceWeb/mainServiceWeb.py | 8 +-
.../workflows/methods/methodAi/methodAi.py | 7 +
.../methods/methodContext/methodContext.py | 2 +
.../methods/methodOutlook/methodOutlook.py | 4 +
.../methodSharepoint/methodSharepoint.py | 9 +
.../processing/shared/placeholderFactory.py | 23 +-
modules/workflows/workflowManager.py | 9 +
tests/functional/test01_ai_model_selection.py | 8 +-
tests/functional/test02_ai_models.py | 6 +-
tests/functional/test03_ai_operations.py | 2 +-
.../services/test_json_extraction_merging.py | 386 ++++++++++
26 files changed, 1729 insertions(+), 697 deletions(-)
create mode 100644 modules/services/serviceGeneration/paths/ARCHITECTURE_ANALYSIS.md
create mode 100644 modules/services/serviceGeneration/paths/ARCHITECTURE_CHANGES.md
create mode 100644 tests/unit/services/test_json_extraction_merging.py
diff --git a/modules/aicore/aicorePluginPerplexity.py b/modules/aicore/aicorePluginPerplexity.py
index 3f1d9815..e129b047 100644
--- a/modules/aicore/aicorePluginPerplexity.py
+++ b/modules/aicore/aicorePluginPerplexity.py
@@ -70,7 +70,7 @@ class AiPerplexity(BaseConnectorAi):
priority=PriorityEnum.BALANCED,
processingMode=ProcessingModeEnum.ADVANCED,
operationTypes=createOperationTypeRatings(
- (OperationTypeEnum.WEB_SEARCH, 9),
+ (OperationTypeEnum.WEB_SEARCH_DATA, 9),
(OperationTypeEnum.WEB_CRAWL, 7)
),
version="sonar",
@@ -93,7 +93,7 @@ class AiPerplexity(BaseConnectorAi):
priority=PriorityEnum.QUALITY,
processingMode=ProcessingModeEnum.DETAILED,
operationTypes=createOperationTypeRatings(
- (OperationTypeEnum.WEB_SEARCH, 9),
+ (OperationTypeEnum.WEB_SEARCH_DATA, 9),
(OperationTypeEnum.WEB_CRAWL, 8)
),
version="sonar-pro",
@@ -211,7 +211,7 @@ class AiPerplexity(BaseConnectorAi):
"""
operationType = modelCall.options.operationType
- if operationType == OperationTypeEnum.WEB_SEARCH:
+ if operationType == OperationTypeEnum.WEB_SEARCH_DATA:
return await self.webSearch(modelCall)
elif operationType == OperationTypeEnum.WEB_CRAWL:
return await self.webCrawl(modelCall)
@@ -257,7 +257,7 @@ class AiPerplexity(BaseConnectorAi):
async def webSearch(self, modelCall: AiModelCall) -> AiModelResponse:
"""
- WEB_SEARCH operation - returns list of URLs based on search query.
+ WEB_SEARCH_DATA operation - returns list of URLs based on search query.
Args:
modelCall: AiModelCall with AiCallPromptWebSearch as prompt
@@ -340,7 +340,7 @@ Return ONLY a JSON array of URLs, no additional text:
content=content,
success=True,
modelId=model.name,
- metadata={"response_id": apiResponse.get("id", ""), "operation": "WEB_SEARCH"}
+ metadata={"response_id": apiResponse.get("id", ""), "operation": "WEB_SEARCH_DATA"}
)
except Exception as e:
diff --git a/modules/aicore/aicorePluginTavily.py b/modules/aicore/aicorePluginTavily.py
index 65a3aa6e..90718683 100644
--- a/modules/aicore/aicorePluginTavily.py
+++ b/modules/aicore/aicorePluginTavily.py
@@ -67,7 +67,7 @@ class AiTavily(BaseConnectorAi):
priority=PriorityEnum.BALANCED,
processingMode=ProcessingModeEnum.BASIC,
operationTypes=createOperationTypeRatings(
- (OperationTypeEnum.WEB_SEARCH, 9),
+ (OperationTypeEnum.WEB_SEARCH_DATA, 9),
(OperationTypeEnum.WEB_CRAWL, 10)
),
version="tavily-search",
@@ -445,7 +445,7 @@ class AiTavily(BaseConnectorAi):
"""
operationType = modelCall.options.operationType
- if operationType == OperationTypeEnum.WEB_SEARCH:
+ if operationType == OperationTypeEnum.WEB_SEARCH_DATA:
return await self.webSearch(modelCall)
elif operationType == OperationTypeEnum.WEB_CRAWL:
return await self.webCrawl(modelCall)
@@ -459,7 +459,7 @@ class AiTavily(BaseConnectorAi):
async def webSearch(self, modelCall: AiModelCall) -> "AiModelResponse":
"""
- WEB_SEARCH operation - returns list of URLs using Tavily search.
+ WEB_SEARCH_DATA operation - returns list of URLs using Tavily search.
Args:
modelCall: AiModelCall with AiCallPromptWebSearch as prompt
@@ -516,7 +516,7 @@ class AiTavily(BaseConnectorAi):
return AiModelResponse(
content=json.dumps(urls, indent=2),
success=True,
- metadata={"total_urls": len(urls), "operation": "WEB_SEARCH"}
+ metadata={"total_urls": len(urls), "operation": "WEB_SEARCH_DATA"}
)
except Exception as e:
diff --git a/modules/datamodels/datamodelAi.py b/modules/datamodels/datamodelAi.py
index 2434c451..9e680164 100644
--- a/modules/datamodels/datamodelAi.py
+++ b/modules/datamodels/datamodelAi.py
@@ -25,7 +25,7 @@ class OperationTypeEnum(str, Enum):
IMAGE_GENERATE = "imageGenerate"
# Web Operations
- WEB_SEARCH = "webSearch" # Returns list of URLs only
+ WEB_SEARCH_DATA = "webSearch" # Returns list of URLs only
WEB_CRAWL = "webCrawl" # Web crawl for a given URL
@@ -50,7 +50,7 @@ def createOperationTypeRatings(*ratings: Tuple[OperationTypeEnum, int]) -> List[
Usage:
operationTypes = createOperationTypeRatings(
(OperationTypeEnum.DATA_ANALYSE, 8),
- (OperationTypeEnum.WEB_SEARCH, 10),
+ (OperationTypeEnum.WEB_SEARCH_DATA, 10),
(OperationTypeEnum.WEB_CRAWL, 9)
)
"""
@@ -197,7 +197,7 @@ class AiModelResponse(BaseModel):
# Structured prompt models for specialized operations
class AiCallPromptWebSearch(BaseModel):
- """Structured prompt format for WEB_SEARCH operation - returns list of URLs."""
+ """Structured prompt format for WEB_SEARCH_DATA operation - returns list of URLs."""
instruction: str = Field(description="Search instruction/query for finding relevant URLs")
country: Optional[str] = Field(default=None, description="Two-digit country code (lowercase, e.g., ch, us, de, fr)")
diff --git a/modules/datamodels/datamodelWorkflowActions.py b/modules/datamodels/datamodelWorkflowActions.py
index a3812955..8bac1fd5 100644
--- a/modules/datamodels/datamodelWorkflowActions.py
+++ b/modules/datamodels/datamodelWorkflowActions.py
@@ -56,6 +56,7 @@ class WorkflowActionDefinition(BaseModel):
)
category: Optional[str] = Field(None, description="Action category for grouping")
tags: List[str] = Field(default_factory=list, description="Tags for search/filtering")
+ dynamicMode: bool = Field(False, description="Whether this action is available in dynamic workflow mode (only tagged actions are visible in action planning and refinement prompts)")
# Register model labels for UI
@@ -68,6 +69,7 @@ registerModelLabels(
"parameters": {"en": "Parameters", "fr": "Paramètres"},
"category": {"en": "Category", "fr": "Catégorie"},
"tags": {"en": "Tags", "fr": "Étiquettes"},
+ "dynamicMode": {"en": "Dynamic Mode", "fr": "Mode dynamique"},
},
)
diff --git a/modules/services/serviceAi/mainServiceAi.py b/modules/services/serviceAi/mainServiceAi.py
index cb54a42b..6ab60b85 100644
--- a/modules/services/serviceAi/mainServiceAi.py
+++ b/modules/services/serviceAi/mainServiceAi.py
@@ -362,7 +362,7 @@ Respond with ONLY a JSON object in this exact format:
opType: OperationTypeEnum,
aiOperationId: str
) -> AiResponse:
- """Handle WEB_SEARCH and WEB_CRAWL operation types."""
+ """Handle WEB_SEARCH_DATA and WEB_CRAWL operation types."""
self.services.chat.progressLogUpdate(aiOperationId, 0.4, f"Calling AI for {opType.name}")
request = AiCallRequest(
@@ -600,7 +600,7 @@ Respond with ONLY a JSON object in this exact format:
# Image generation - route to image path
return await self._handleImageGeneration(prompt, options, title, parentOperationId)
- if opType == OperationTypeEnum.WEB_SEARCH or opType == OperationTypeEnum.WEB_CRAWL:
+ if opType == OperationTypeEnum.WEB_SEARCH_DATA or opType == OperationTypeEnum.WEB_CRAWL:
return await self._handleWebOperation(prompt, options, opType, aiOperationId)
# Data generation - REQUIRES explicit generationIntent
diff --git a/modules/services/serviceAi/subAiCallLooping.py b/modules/services/serviceAi/subAiCallLooping.py
index 63b0c806..62c91ce6 100644
--- a/modules/services/serviceAi/subAiCallLooping.py
+++ b/modules/services/serviceAi/subAiCallLooping.py
@@ -87,6 +87,7 @@ class AiCallLooper:
lastRawResponse = None # Store last raw JSON response for continuation
documentMetadata = None # Store document metadata (title, filename) from first iteration
accumulationState = None # Track accumulation state for string accumulation
+ accumulatedDirectJson = [] # Accumulate JSON strings for direct return use cases (chapter_structure, code_structure)
# Get parent operation ID for iteration operations (parentId should be operationId, not log entry ID)
parentOperationId = operationId # Use the parent's operationId directly
@@ -238,11 +239,54 @@ class AiCallLooper:
# Handle use cases that return JSON directly (no section extraction needed)
directReturnUseCases = ["section_content", "chapter_structure", "code_structure", "code_content", "image_batch"]
if useCaseId in directReturnUseCases:
+ # For chapter_structure and code_structure, check completeness and support looping
+ if useCaseId in ["chapter_structure", "code_structure"] and parsedJsonForUseCase:
+ isComplete = JsonResponseHandler.isJsonComplete(parsedJsonForUseCase)
+
+ if not isComplete:
+ logger.warning(f"Iteration {iteration}: Use case '{useCaseId}' - JSON is incomplete, continuing for continuation")
+ # Accumulate response for merging in next iteration
+ accumulatedDirectJson.append(result)
+
+ # Continue to next iteration - continuation prompt builder will handle the rest
+ if iterationOperationId:
+ self.services.chat.progressLogUpdate(iterationOperationId, 0.7, "JSON incomplete, requesting continuation")
+ self.services.chat.progressLogFinish(iterationOperationId, True)
+ continue
+ else:
+ # JSON is complete - merge accumulated responses if any
+ if accumulatedDirectJson:
+ logger.info(f"Iteration {iteration}: Merging {len(accumulatedDirectJson) + 1} accumulated responses")
+ # Merge accumulated JSON strings with current response
+ mergedJsonString = accumulatedDirectJson[0] if accumulatedDirectJson else result
+ for prevJson in accumulatedDirectJson[1:]:
+ mergedJsonString = JsonResponseHandler.mergeJsonStringsWithOverlap(mergedJsonString, prevJson)
+ # Finally merge with current response
+ mergedJsonString = JsonResponseHandler.mergeJsonStringsWithOverlap(mergedJsonString, result)
+
+ # Re-parse merged JSON
+ try:
+ extractedMerged = extractJsonString(mergedJsonString)
+ parsedMerged, parseError, _ = tryParseJson(extractedMerged)
+ if parseError is None and parsedMerged:
+ parsedJsonForUseCase = parsedMerged
+ result = mergedJsonString
+ logger.info(f"Successfully merged and parsed {len(accumulatedDirectJson) + 1} JSON fragments")
+ except Exception as e:
+ logger.warning(f"Failed to parse merged JSON, using last response: {e}")
+
+ logger.info(f"Iteration {iteration}: Use case '{useCaseId}' - JSON is complete")
+
logger.info(f"Iteration {iteration}: Use case '{useCaseId}' - returning JSON directly")
if iterationOperationId:
self.services.chat.progressLogFinish(iterationOperationId, True)
- final_json = json.dumps(parsedJsonForUseCase, indent=2, ensure_ascii=False) if parsedJsonForUseCase else (extractedJsonForUseCase or result)
+ # For section_content, return raw result to allow merging of multiple JSON blocks
+ # The merging logic in subStructureFilling.py will handle extraction and merging
+ if useCaseId == "section_content":
+ final_json = result # Return raw response to preserve all JSON blocks
+ else:
+ final_json = json.dumps(parsedJsonForUseCase, indent=2, ensure_ascii=False) if parsedJsonForUseCase else (extractedJsonForUseCase or result)
# Write final result for chapter structure and code structure (section_content skips it)
if useCaseId in ["chapter_structure", "code_structure"]:
diff --git a/modules/services/serviceAi/subContentExtraction.py b/modules/services/serviceAi/subContentExtraction.py
index 229587f8..3eff0855 100644
--- a/modules/services/serviceAi/subContentExtraction.py
+++ b/modules/services/serviceAi/subContentExtraction.py
@@ -180,177 +180,41 @@ class ContentExtractor:
elif hasRenderIntent and not hasPartData:
logger.warning(f"⚠️ Part {part.id} has render intent but no data, skipping render part")
- # 3. Extract Intent: Erstelle Extracted ContentPart (möglicherweise mit zusätzlicher Verarbeitung)
+ # 3. Extract Intent: Erstelle Extracted ContentPart (NO AI processing here - happens during section generation)
if hasExtractIntent:
- # Spezielle Behandlung für Images: Vision AI für Text-Extraktion
+ # For images: Keep as image part with extract intent - Vision AI extraction happens during section generation
if part.typeGroup == "image" and hasPartData:
- logger.info(f"🔄 Processing image {part.id} with Vision AI (extract intent)")
- try:
- extractionPrompt = intent.extractionPrompt if intent and intent.extractionPrompt else "Extract all text content from this image. Return only the extracted text, no additional formatting."
- extractedText = await self.extractTextFromImage(part, extractionPrompt)
- if extractedText:
- # Prüfe ob es ein Error-Message ist
- isError = extractedText.startswith("[ERROR:")
-
- # Erstelle neuen Text-Part mit extrahiertem Text oder Error-Message
- textPart = ContentPart(
- id=f"extracted_{document.id}_{part.id}",
- label=f"Extracted text from {part.label or 'Image'}" if not isError else f"Error extracting from {part.label or 'Image'}",
- typeGroup="text",
- mimeType="text/plain",
- data=extractedText,
- metadata={
- "contentFormat": "extracted",
- "documentId": document.id,
- "intent": "extract",
- "originalFileName": preExtracted["originalDocument"]["fileName"],
- "relatedObjectPartId": f"obj_{document.id}_{part.id}" if hasRenderIntent else None,
- "extractionPrompt": extractionPrompt,
- "extractionMethod": "vision",
- "isError": isError
- }
- )
- allContentParts.append(textPart)
- if isError:
- logger.error(f"❌ Vision AI extraction failed for image {part.id}: {extractedText}")
- else:
- logger.info(f"✅ Extracted text from image {part.id} using Vision AI: {len(extractedText)} chars")
- else:
- # Sollte nicht vorkommen (Funktion gibt jetzt immer Error-Message zurück)
- errorMsg = f"Vision AI extraction failed: Unexpected empty response for image {part.id}"
- logger.error(errorMsg)
- errorPart = ContentPart(
- id=f"extracted_{document.id}_{part.id}",
- label=f"Error extracting from {part.label or 'Image'}",
- typeGroup="text",
- mimeType="text/plain",
- data=f"[ERROR: {errorMsg}]",
- metadata={
- "contentFormat": "extracted",
- "documentId": document.id,
- "intent": "extract",
- "originalFileName": preExtracted["originalDocument"]["fileName"],
- "extractionPrompt": extractionPrompt,
- "extractionMethod": "vision",
- "isError": True
- }
- )
- allContentParts.append(errorPart)
- except Exception as e:
- logger.error(f"❌ Failed to extract text from image {part.id}: {str(e)}")
- import traceback
- logger.debug(f"Traceback: {traceback.format_exc()}")
- # Kein Fallback: Wenn render Intent vorhanden, haben wir bereits object Part
- # Wenn nur extract Intent: Original Part ist kein Text, daher nicht als extracted hinzufügen
- if not hasRenderIntent:
- logger.debug(f"Image {part.id} has only extract intent, Vision AI failed - no extracted text available")
+ logger.info(f"📷 Image {part.id} with extract intent - will be processed with Vision AI during section generation")
+ # Keep image part as-is, mark with extract intent
+ part.metadata.update({
+ "contentFormat": "extracted", # Marked for extraction, but not yet extracted
+ "intent": "extract",
+ "originalFileName": preExtracted["originalDocument"]["fileName"],
+ "relatedObjectPartId": f"obj_{document.id}_{part.id}" if hasRenderIntent else None,
+ "extractionPrompt": intent.extractionPrompt if intent and intent.extractionPrompt else "Extract all text content from this image.",
+ "needsVisionExtraction": True # Flag to indicate Vision AI extraction needed
+ })
+ allContentParts.append(part)
+ originalPartAdded = True
else:
- # Für alle anderen Content-Typen: Prüfe ob AI-Verarbeitung benötigt wird
- # WICHTIG: Pre-extracted ContentParts von context.extractContent enthalten RAW extrahierten Content
- # (z.B. Text aus PDF-Text-Layer, Tabellen, etc.). Wenn "extract" Intent vorhanden ist,
- # muss dieser Content mit AI verarbeitet werden basierend auf extractionPrompt.
-
- # Prüfe ob Part Text-Content hat (kann mit AI verarbeitet werden)
- isTextContent = (
- part.typeGroup == "text" or
- part.typeGroup == "table" or
- (part.data and isinstance(part.data, str) and len(part.data.strip()) > 0)
- )
-
- if isTextContent and intent and intent.extractionPrompt:
- # Text-Content mit extractionPrompt: Verarbeite mit AI
- logger.info(f"🔄 Processing text content {part.id} with AI (extract intent with prompt)")
- try:
- extractionPrompt = intent.extractionPrompt
- processedText = await self.processTextContentWithAi(part, extractionPrompt)
- if processedText:
- # Prüfe ob es ein Error-Message ist
- isError = processedText.startswith("[ERROR:")
-
- # Erstelle neuen Text-Part mit AI-verarbeitetem Text oder Error-Message
- processedPart = ContentPart(
- id=f"extracted_{document.id}_{part.id}",
- label=f"AI-processed: {part.label or 'Content'}" if not isError else f"Error processing {part.label or 'Content'}",
- typeGroup="text",
- mimeType="text/plain",
- data=processedText,
- metadata={
- "contentFormat": "extracted",
- "documentId": document.id,
- "intent": "extract",
- "originalFileName": preExtracted["originalDocument"]["fileName"],
- "relatedObjectPartId": f"obj_{document.id}_{part.id}" if hasRenderIntent else None,
- "extractionPrompt": extractionPrompt,
- "extractionMethod": "ai",
- "sourcePartId": part.id,
- "fromExtractContent": True,
- "isError": isError
- }
- )
- allContentParts.append(processedPart)
- originalPartAdded = True
- if isError:
- logger.error(f"❌ AI text processing failed for part {part.id}: {processedText}")
- else:
- logger.info(f"✅ Processed text content {part.id} with AI: {len(processedText)} chars")
- else:
- # Sollte nicht vorkommen (Funktion gibt jetzt immer Error-Message zurück)
- errorMsg = f"AI text processing failed: Unexpected empty response for part {part.id}"
- logger.error(errorMsg)
- errorPart = ContentPart(
- id=f"extracted_{document.id}_{part.id}",
- label=f"Error processing {part.label or 'Content'}",
- typeGroup="text",
- mimeType="text/plain",
- data=f"[ERROR: {errorMsg}]",
- metadata={
- "contentFormat": "extracted",
- "documentId": document.id,
- "intent": "extract",
- "originalFileName": preExtracted["originalDocument"]["fileName"],
- "extractionPrompt": extractionPrompt,
- "extractionMethod": "ai",
- "sourcePartId": part.id,
- "isError": True
- }
- )
- allContentParts.append(errorPart)
- originalPartAdded = True
- except Exception as e:
- logger.error(f"❌ Failed to process text content {part.id} with AI: {str(e)}")
- import traceback
- logger.debug(f"Traceback: {traceback.format_exc()}")
- # Fallback: Verwende Original-Part
- if not originalPartAdded:
- part.metadata.update({
- "contentFormat": "extracted",
- "intent": "extract",
- "fromExtractContent": True,
- "skipExtraction": True,
- "originalFileName": preExtracted["originalDocument"]["fileName"],
- "relatedObjectPartId": f"obj_{document.id}_{part.id}" if hasRenderIntent else None
- })
- allContentParts.append(part)
- originalPartAdded = True
- else:
- # Kein extractionPrompt oder kein Text-Content: Verwende Part direkt als extracted
- # (Content ist bereits extrahiert von context.extractContent, keine weitere AI-Verarbeitung nötig)
- # WICHTIG: Nur hinzufügen wenn noch nicht hinzugefügt (z.B. durch render Intent)
- if not originalPartAdded:
- part.metadata.update({
- "contentFormat": "extracted",
- "intent": "extract",
- "fromExtractContent": True,
- "skipExtraction": True, # Bereits extrahiert
- "originalFileName": preExtracted["originalDocument"]["fileName"],
- "relatedObjectPartId": f"obj_{document.id}_{part.id}" if hasRenderIntent else None
- })
- # Stelle sicher dass contentFormat gesetzt ist
- if "contentFormat" not in part.metadata:
- part.metadata["contentFormat"] = "extracted"
- allContentParts.append(part)
- originalPartAdded = True
- logger.debug(f"✅ Using pre-extracted ContentPart {part.id} as extracted (no AI processing needed)")
+ # For text/table content: Use directly as extracted (no AI processing here)
+ # AI processing with extractionPrompt happens during section generation
+ if not originalPartAdded:
+ part.metadata.update({
+ "contentFormat": "extracted",
+ "intent": "extract",
+ "fromExtractContent": True,
+ "skipExtraction": True, # Already extracted (raw extraction)
+ "originalFileName": preExtracted["originalDocument"]["fileName"],
+ "relatedObjectPartId": f"obj_{document.id}_{part.id}" if hasRenderIntent else None,
+ "extractionPrompt": intent.extractionPrompt if intent and intent.extractionPrompt else None
+ })
+ # Stelle sicher dass contentFormat gesetzt ist
+ if "contentFormat" not in part.metadata:
+ part.metadata["contentFormat"] = "extracted"
+ allContentParts.append(part)
+ originalPartAdded = True
+ logger.debug(f"✅ Using pre-extracted ContentPart {part.id} as extracted (no AI processing needed)")
# 4. Fallback: Wenn kein Intent vorhanden oder Part wurde noch nicht hinzugefügt
# (sollte normalerweise nicht vorkommen, da default "extract" ist)
@@ -508,6 +372,12 @@ class ContentExtractor:
# Verknüpfung zu object Part (falls vorhanden)
"relatedObjectPartId": f"obj_{document.id}" if "render" in intent.intents else None
})
+
+ # For images: Mark that Vision AI extraction is needed during section generation
+ if part.typeGroup == "image":
+ part.metadata["needsVisionExtraction"] = True
+ logger.info(f"📷 Image part {part.id} marked for Vision AI extraction during section generation")
+
# Stelle sicher, dass ID eindeutig ist (falls object Part existiert)
if "render" in intent.intents:
part.id = f"ext_{document.id}_{part.id}"
diff --git a/modules/services/serviceAi/subStructureFilling.py b/modules/services/serviceAi/subStructureFilling.py
index 5a917279..c301bc67 100644
--- a/modules/services/serviceAi/subStructureFilling.py
+++ b/modules/services/serviceAi/subStructureFilling.py
@@ -12,7 +12,7 @@ import json
import logging
import copy
import asyncio
-from typing import Dict, Any, List, Optional
+from typing import Dict, Any, List, Optional, Tuple
from modules.datamodels.datamodelExtraction import ContentPart
from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, OperationTypeEnum, PriorityEnum, ProcessingModeEnum
@@ -51,6 +51,41 @@ class StructureFiller:
pass
return 'en' # Default fallback
+ def _extractContentPartInfo(self, chapter: Dict[str, Any]) -> Tuple[List[str], Dict[str, Any]]:
+ """
+ Extract contentPartIds and contentPartInstructions from chapter's contentParts structure.
+
+ Returns:
+ tuple: (contentPartIds list, contentPartInstructions dict)
+ """
+ contentParts = chapter.get("contentParts", {})
+ contentPartIds = list(contentParts.keys())
+ # Extract instructions (only entries with "instruction" field)
+ contentPartInstructions = {}
+ for partId, partInfo in contentParts.items():
+ if isinstance(partInfo, dict) and "instruction" in partInfo:
+ contentPartInstructions[partId] = {"instruction": partInfo["instruction"]}
+ return contentPartIds, contentPartInstructions
+
+ def _getContentPartCaption(self, chapter: Dict[str, Any], partId: str) -> Optional[str]:
+ """
+ Get caption for a contentPart from chapter's contentParts structure.
+ Returns None if no caption is available.
+
+ Args:
+ chapter: Chapter dict
+ partId: ContentPart ID
+
+ Returns:
+ Caption string or None
+ """
+ if "contentParts" in chapter:
+ contentParts = chapter.get("contentParts", {})
+ partInfo = contentParts.get(partId)
+ if isinstance(partInfo, dict) and "caption" in partInfo:
+ return partInfo["caption"]
+ return None
+
async def fillStructure(
self,
structure: Dict[str, Any],
@@ -282,8 +317,7 @@ class StructureFiller:
chapterLevel = chapter.get("level", 1)
chapterTitle = chapter.get("title", "Untitled Chapter")
generationHint = chapter.get("generationHint", "")
- contentPartIds = chapter.get("contentPartIds", [])
- contentPartInstructions = chapter.get("contentPartInstructions", {})
+ contentPartIds, contentPartInstructions = self._extractContentPartInfo(chapter)
# Create task for parallel processing with semaphore
async def processChapterWithSemaphore(chapter, chapterIndex, chapterId, chapterLevel, chapterTitle, generationHint, contentPartIds, contentPartInstructions):
@@ -550,13 +584,69 @@ class StructureFiller:
}
})
- # Aggregiere extracted Parts mit AI
- if extractedParts:
- logger.debug(f"Section {sectionId}: Aggregating {len(extractedParts)} extracted parts with AI")
+ # Extract images with Vision AI if needed (before aggregation)
+ processedExtractedParts = []
+ for part in extractedParts:
+ # Check if this is an image that needs Vision AI extraction
+ if (part.typeGroup == "image" and
+ part.metadata.get("needsVisionExtraction") == True and
+ part.metadata.get("intent") == "extract"):
+
+ logger.info(f"Section {sectionId}: Extracting text from image {part.id} using Vision AI")
+ try:
+ extractionPrompt = part.metadata.get("extractionPrompt") or "Extract all text content from this image. Return only the extracted text, no additional formatting."
+
+ # Call Vision AI to extract text from image
+ visionRequest = AiCallRequest(
+ prompt=extractionPrompt,
+ context="",
+ options=AiCallOptions(operationType=OperationTypeEnum.IMAGE_ANALYSE),
+ contentParts=[part]
+ )
+
+ visionResponse = await self.aiService.callAi(visionRequest)
+
+ if visionResponse and visionResponse.content:
+ # Create text part with extracted content
+ textPart = ContentPart(
+ id=f"vision_extracted_{part.id}",
+ label=f"Extracted text from {part.label or 'Image'}",
+ typeGroup="text",
+ mimeType="text/plain",
+ data=visionResponse.content.strip(),
+ metadata={
+ **part.metadata,
+ "contentFormat": "extracted",
+ "extractionMethod": "vision",
+ "sourceImagePartId": part.id,
+ "needsVisionExtraction": False # Already extracted
+ }
+ )
+ processedExtractedParts.append(textPart)
+ logger.info(f"✅ Extracted text from image {part.id}: {len(visionResponse.content)} chars")
+ else:
+ logger.warning(f"⚠️ Vision AI extraction returned no content for image {part.id}")
+ # Keep original image part, but mark extraction as attempted
+ part.metadata["needsVisionExtraction"] = False
+ part.metadata["visionExtractionFailed"] = True
+ processedExtractedParts.append(part)
+ except Exception as e:
+ logger.error(f"❌ Vision AI extraction failed for image {part.id}: {str(e)}")
+ # Keep original image part, but mark extraction as attempted
+ part.metadata["needsVisionExtraction"] = False
+ part.metadata["visionExtractionFailed"] = True
+ processedExtractedParts.append(part)
+ else:
+ # Not an image needing extraction, or already processed
+ processedExtractedParts.append(part)
+
+ # Aggregiere extracted Parts mit AI (now with Vision-extracted text parts)
+ if processedExtractedParts:
+ logger.debug(f"Section {sectionId}: Aggregating {len(processedExtractedParts)} extracted parts with AI")
isAggregation = True
generationPrompt = self._buildSectionGenerationPrompt(
section=section,
- contentParts=extractedParts,
+ contentParts=processedExtractedParts,
userPrompt=userPrompt,
generationHint=generationHint,
allSections=all_sections_list,
@@ -685,34 +775,40 @@ The JSON should be a fragment that can be merged with the previous response."""
# Use tryParseJson which handles extraction and basic parsing
from modules.shared.jsonUtils import tryParseJson, repairBrokenJson
- parsedResponse, parseError, cleanedStr = tryParseJson(aiResponseJson)
-
- # If parsing failed, try repair
- if parseError and isinstance(aiResponseJson, str):
- logger.warning(f"Initial JSON parse failed for section {sectionId}, attempting repair: {str(parseError)}")
- repairedJson = repairBrokenJson(aiResponseJson)
- if repairedJson:
- parsedResponse = repairedJson
- parseError = None
- logger.info(f"Successfully repaired JSON for section {sectionId}")
-
- if parseError:
- raise parseError
-
- if isinstance(parsedResponse, list):
- generatedElements = parsedResponse
- elif isinstance(parsedResponse, dict):
- if "elements" in parsedResponse:
- generatedElements = parsedResponse["elements"]
- elif "sections" in parsedResponse and len(parsedResponse["sections"]) > 0:
- firstSection = parsedResponse["sections"][0]
- generatedElements = firstSection.get("elements", [])
- elif parsedResponse.get("type"):
- generatedElements = [parsedResponse]
+ # Check if response contains multiple JSON blocks (separated by --- or multiple ```json blocks)
+ # This can happen when AI returns multiple complete responses
+ if isinstance(aiResponseJson, str) and ("---" in aiResponseJson or aiResponseJson.count("```json") > 1):
+ logger.info(f"Section {sectionId}: Detected multiple JSON blocks in response, attempting to merge")
+ generatedElements = self._extractAndMergeMultipleJsonBlocks(aiResponseJson, contentType, sectionId)
+ else:
+ parsedResponse, parseError, cleanedStr = tryParseJson(aiResponseJson)
+
+ # If parsing failed, try repair
+ if parseError and isinstance(aiResponseJson, str):
+ logger.warning(f"Initial JSON parse failed for section {sectionId}, attempting repair: {str(parseError)}")
+ repairedJson = repairBrokenJson(aiResponseJson)
+ if repairedJson:
+ parsedResponse = repairedJson
+ parseError = None
+ logger.info(f"Successfully repaired JSON for section {sectionId}")
+
+ if parseError:
+ raise parseError
+
+ if isinstance(parsedResponse, list):
+ generatedElements = parsedResponse
+ elif isinstance(parsedResponse, dict):
+ if "elements" in parsedResponse:
+ generatedElements = parsedResponse["elements"]
+ elif "sections" in parsedResponse and len(parsedResponse["sections"]) > 0:
+ firstSection = parsedResponse["sections"][0]
+ generatedElements = firstSection.get("elements", [])
+ elif parsedResponse.get("type"):
+ generatedElements = [parsedResponse]
+ else:
+ generatedElements = []
else:
generatedElements = []
- else:
- generatedElements = []
class AiResponse:
def __init__(self, content):
@@ -1006,8 +1102,54 @@ The JSON should be a fragment that can be merged with the previous response."""
})
elif contentFormat == "extracted":
+ # Check if this is an image that needs Vision AI extraction
+ originalPartId = part.id
+ if (part.typeGroup == "image" and
+ part.metadata.get("needsVisionExtraction") == True and
+ part.metadata.get("intent") == "extract"):
+
+ logger.info(f"Section {sectionId}: Extracting text from single image {part.id} using Vision AI")
+ try:
+ extractionPrompt = part.metadata.get("extractionPrompt") or "Extract all text content from this image. Return only the extracted text, no additional formatting."
+
+ # Call Vision AI to extract text from image
+ visionRequest = AiCallRequest(
+ prompt=extractionPrompt,
+ context="",
+ options=AiCallOptions(operationType=OperationTypeEnum.IMAGE_ANALYSE),
+ contentParts=[part]
+ )
+
+ visionResponse = await self.aiService.callAi(visionRequest)
+
+ if visionResponse and visionResponse.content:
+ # Replace image part with text part for further processing
+ part = ContentPart(
+ id=f"vision_extracted_{originalPartId}",
+ label=f"Extracted text from {part.label or 'Image'}",
+ typeGroup="text",
+ mimeType="text/plain",
+ data=visionResponse.content.strip(),
+ metadata={
+ **part.metadata,
+ "contentFormat": "extracted",
+ "extractionMethod": "vision",
+ "sourceImagePartId": originalPartId,
+ "needsVisionExtraction": False # Already extracted
+ }
+ )
+ logger.info(f"✅ Extracted text from image {originalPartId}: {len(visionResponse.content)} chars")
+ else:
+ logger.warning(f"⚠️ Vision AI extraction returned no content for image {originalPartId}")
+ part.metadata["needsVisionExtraction"] = False
+ part.metadata["visionExtractionFailed"] = True
+ except Exception as e:
+ logger.error(f"❌ Vision AI extraction failed for image {originalPartId}: {str(e)}")
+ part.metadata["needsVisionExtraction"] = False
+ part.metadata["visionExtractionFailed"] = True
+
if useAiCall and generationHint:
- # AI-Call mit einzelnen ContentPart
+ # AI-Call mit einzelnen ContentPart (now may be text part after Vision extraction)
logger.debug(f"Processing section {sectionId}: Single extracted part with AI call")
generationPrompt = self._buildSectionGenerationPrompt(
section=section,
@@ -1405,7 +1547,7 @@ The JSON should be a fragment that can be merged with the previous response."""
if "chapters" in doc:
for chapter in doc.get("chapters", []):
# Füge Metadaten zu Chapter-Level contentPartIds hinzu
- chapterContentPartIds = chapter.get("contentPartIds", [])
+ chapterContentPartIds, _ = self._extractContentPartInfo(chapter)
if chapterContentPartIds:
chapter["contentPartsMetadata"] = []
for partId in chapterContentPartIds:
@@ -1793,6 +1935,233 @@ CRITICAL:
"""
return prompt
+ def _extractAndMergeMultipleJsonBlocks(self, responseText: str, contentType: str, sectionId: str) -> List[Dict[str, Any]]:
+ """
+ Extract multiple JSON blocks from response and merge them appropriately.
+ For tables: Merge all rows into a single table.
+ For other types: Combine elements.
+ """
+ from modules.shared.jsonUtils import tryParseJson, stripCodeFences, normalizeJsonText, extractFirstBalancedJson
+
+ # Extract all JSON blocks, handling both --- separators and multiple ```json blocks
+ blocks = []
+
+ # Strategy: Extract all ```json blocks first (most reliable), then fall back to other methods
+ # This handles cases where --- separators and ```json blocks are mixed
+ if "```json" in responseText:
+ # Extract all ```json blocks regardless of --- separators
+ jsonParts = responseText.split("```json")
+ for jsonPart in jsonParts[1:]: # Skip first empty part
+ jsonPart = "```json" + jsonPart
+ # Extract just the JSON block (until closing ```)
+ closingFence = jsonPart.find("```", 7) # Find closing ``` after "```json"
+ if closingFence != -1:
+ jsonPart = jsonPart[:closingFence + 3]
+ jsonPart = jsonPart.strip()
+ if jsonPart:
+ blocks.append(jsonPart)
+
+ # If no ```json blocks found, try splitting by --- and extracting JSON
+ if not blocks and "---" in responseText:
+ parts = responseText.split("---")
+ for part in parts:
+ part = part.strip()
+ if not part:
+ continue
+
+ # Try to extract JSON directly from this part
+ normalized = normalizeJsonText(part)
+ normalized = stripCodeFences(normalized)
+ jsonBlock = extractFirstBalancedJson(normalized)
+ if jsonBlock:
+ blocks.append(jsonBlock)
+ elif responseText.count("```json") > 1:
+ # Split by ```json markers (no --- separator)
+ parts = responseText.split("```json")
+ for part in parts[1:]: # Skip first empty part
+ part = "```json" + part
+ part = part.strip()
+ if part:
+ blocks.append(part)
+ else:
+ # Try to find multiple JSON objects/arrays directly
+ normalized = normalizeJsonText(responseText)
+ normalized = stripCodeFences(normalized)
+
+ # Find all JSON blocks
+ start = 0
+ while start < len(normalized):
+ # Find next JSON start
+ brace = normalized.find('{', start)
+ bracket = normalized.find('[', start)
+ jsonStart = -1
+ if brace != -1 and (bracket == -1 or brace < bracket):
+ jsonStart = brace
+ elif bracket != -1:
+ jsonStart = bracket
+
+ if jsonStart == -1:
+ break
+
+ # Extract balanced JSON
+ jsonBlock = extractFirstBalancedJson(normalized[jsonStart:])
+ if jsonBlock:
+ blocks.append(jsonBlock)
+ start = jsonStart + len(jsonBlock)
+ else:
+ break
+
+ if not blocks:
+ logger.warning(f"Section {sectionId}: Could not extract multiple JSON blocks")
+ return []
+
+ logger.info(f"Section {sectionId}: Extracted {len(blocks)} JSON blocks, merging for contentType={contentType}")
+
+ # Parse all blocks
+ allElements = []
+ for i, block in enumerate(blocks):
+ parsed, parseError, _ = tryParseJson(block)
+ if parseError:
+ logger.warning(f"Section {sectionId}: Failed to parse JSON block {i+1}: {str(parseError)}")
+ continue
+
+ elementsFromBlock = []
+ if isinstance(parsed, dict):
+ if "elements" in parsed:
+ elementsFromBlock = parsed["elements"]
+ allElements.extend(elementsFromBlock)
+ elif parsed.get("type"):
+ elementsFromBlock = [parsed]
+ allElements.append(parsed)
+ elif isinstance(parsed, list):
+ elementsFromBlock = parsed
+ allElements.extend(parsed)
+
+ # Log row count for table elements
+ if contentType == "table":
+ tableCount = sum(1 for e in elementsFromBlock if isinstance(e, dict) and e.get("type") == "table")
+ rowCount = sum(
+ len(e.get("content", {}).get("rows", []))
+ for e in elementsFromBlock
+ if isinstance(e, dict) and e.get("type") == "table"
+ )
+ if tableCount > 0:
+ logger.info(f"Section {sectionId}: JSON block {i+1}: {tableCount} table(s) with {rowCount} total rows")
+
+ # Merge elements based on contentType
+ if contentType == "table" and len(allElements) > 1:
+ # Find all table elements
+ tableElements = [e for e in allElements if isinstance(e, dict) and e.get("type") == "table"]
+ if len(tableElements) > 1:
+ # Check if tables can be merged (same column counts)
+ canMerge = self._canMergeTables(tableElements)
+ if canMerge:
+ logger.info(f"Section {sectionId}: Merging {len(tableElements)} tables into one")
+ mergedTable = self._mergeTableElements(tableElements)
+ # Replace all table elements with merged one
+ nonTableElements = [e for e in allElements if not (isinstance(e, dict) and e.get("type") == "table")]
+ return [mergedTable] + nonTableElements
+ else:
+ logger.warning(f"Section {sectionId}: Cannot merge {len(tableElements)} tables (incompatible headers/columns). Keeping tables separate.")
+ # Return all elements as-is (tables remain separate)
+ return allElements
+
+ return allElements
+
+ def _canMergeTables(self, tableElements: List[Dict[str, Any]]) -> bool:
+ """Check if tables can be safely merged (same column counts)."""
+ if len(tableElements) <= 1:
+ return True
+
+ # Extract column counts from all tables
+ columnCounts = []
+ for table in tableElements:
+ headers = []
+ if isinstance(table.get("content"), dict):
+ headers = table["content"].get("headers", [])
+ elif isinstance(table.get("content"), list):
+ # Old format: content is list of rows
+ if table["content"] and isinstance(table["content"][0], list):
+ headers = table["content"][0]
+ columnCounts.append(len(headers))
+
+ # Check if all tables have the same column count
+ firstCount = columnCounts[0] if columnCounts else 0
+ return all(count == firstCount for count in columnCounts)
+
+ def _mergeTableElements(self, tableElements: List[Dict[str, Any]]) -> Dict[str, Any]:
+ """Merge multiple table elements into a single table.
+ Assumes tables have compatible column counts (checked by _canMergeTables).
+ """
+ if not tableElements:
+ return {"type": "table", "content": {"headers": [], "rows": []}}
+
+ if len(tableElements) == 1:
+ return tableElements[0]
+
+ # Extract headers from all tables
+ allHeaders = []
+ for table in tableElements:
+ headers = []
+ if isinstance(table.get("content"), dict):
+ headers = table["content"].get("headers", [])
+ elif isinstance(table.get("content"), list):
+ # Old format: content is list of rows
+ if table["content"] and isinstance(table["content"][0], list):
+ headers = table["content"][0]
+ allHeaders.append(headers)
+
+ # Check header compatibility (same headers or just same column count)
+ firstHeaders = allHeaders[0]
+ headersCompatible = all(headers == firstHeaders for headers in allHeaders)
+
+ # If headers differ but column counts match, use first table's headers and log warning
+ if not headersCompatible:
+ logger.warning(f"Merging {len(tableElements)} tables with different headers but same column count. Using headers from first table.")
+
+ # Use headers from first table
+ headers = firstHeaders
+
+ # Collect all rows from all tables, validating column count
+ allRows = []
+ for tableIdx, table in enumerate(tableElements):
+ rows = []
+ if isinstance(table.get("content"), dict):
+ rows = table["content"].get("rows", [])
+ elif isinstance(table.get("content"), list):
+ # Old format: content is list of rows
+ if table["content"] and isinstance(table["content"][0], list):
+ rows = table["content"][1:] if len(table["content"]) > 1 else []
+
+ # Validate row column count matches header count
+ expectedColCount = len(headers)
+ validRows = []
+ for rowIdx, row in enumerate(rows):
+ if isinstance(row, list):
+ if len(row) == expectedColCount:
+ validRows.append(row)
+ else:
+ logger.warning(f"Table {tableIdx+1}, row {rowIdx+1}: column count mismatch ({len(row)} vs {expectedColCount}), skipping row")
+ elif isinstance(row, dict):
+ # Convert dict row to list based on header order
+ rowList = [row.get(h, "") for h in headers]
+ validRows.append(rowList)
+ else:
+ logger.warning(f"Table {tableIdx+1}, row {rowIdx+1}: invalid row format, skipping")
+
+ allRows.extend(validRows)
+
+ # Keep all rows, including duplicates (duplicates may be intentional)
+ logger.info(f"Merged {len(tableElements)} tables: {len(allRows)} total rows (duplicates preserved)")
+
+ return {
+ "type": "table",
+ "content": {
+ "headers": headers,
+ "rows": allRows
+ }
+ }
+
def _findContentPartById(self, partId: str, contentParts: List[ContentPart]) -> Optional[ContentPart]:
"""Finde ContentPart nach ID."""
for part in contentParts:
diff --git a/modules/services/serviceAi/subStructureGeneration.py b/modules/services/serviceAi/subStructureGeneration.py
index cbabd2fc..88f2e1e1 100644
--- a/modules/services/serviceAi/subStructureGeneration.py
+++ b/modules/services/serviceAi/subStructureGeneration.py
@@ -50,8 +50,7 @@ class StructureGenerator:
Phase 5C: Generiert Chapter-Struktur (Table of Contents).
Definiert für jedes Chapter:
- Level, Title
- - contentPartIds
- - contentPartInstructions
+ - contentParts (unified object with instruction and/or caption per part)
- generationHint
Args:
@@ -137,6 +136,9 @@ Continue generating the remaining chapters now.
return basePrompt
# Call AI with looping support
+ # NOTE: Do NOT pass contentParts here - we only need metadata for structure generation
+ # The contentParts metadata is already included in the prompt (contentPartsIndex)
+ # Actual content extraction happens later during section generation
aiResponseJson = await self.aiService.callAiWithLooping(
prompt=structurePrompt,
options=options,
@@ -150,7 +152,7 @@ Continue generating the remaining chapters now.
useCaseId="chapter_structure", # REQUIRED: Explicit use case ID
operationId=structureOperationId,
userPrompt=userPrompt,
- contentParts=contentParts
+ contentParts=None # Do not pass ContentParts - only metadata needed, not content extraction
)
# Parse the complete JSON response (looping system already handles completion)
@@ -248,7 +250,9 @@ Continue generating the remaining chapters now.
language = self._getUserLanguage()
logger.debug(f"Using language from services (user intention analysis) for structure generation: {language}")
- prompt = f"""USER REQUEST (for context):
+ prompt = f"""CRITICAL OUTPUT REQUIREMENT: This is a PLANNING task, not a generation task. You MUST return EXACTLY ONE complete JSON object. Do NOT generate multiple JSON objects, alternatives, or variations. Do NOT use separators like "---" between JSON objects. Return the single best JSON structure that matches the requirements below.
+
+USER REQUEST (for context):
```
{userPrompt}
```
@@ -265,40 +269,23 @@ IMPORTANT - CHAPTER INDEPENDENCE:
- One chapter does NOT have information about another chapter
- Each chapter must provide its own context and be understandable alone
-CRITICAL - CONTENT ASSIGNMENT TO CHAPTERS:
-- You MUST assign available ContentParts to chapters using contentPartIds
-- Based on the user request, determine which content should be used in which chapter
-- If the user request mentions specific content, assign the corresponding ContentPart to the appropriate chapter
-- Chapters WITHOUT contentPartIds can only generate generic content, NOT document-specific analysis
-- To include document content analysis, chapters MUST have contentPartIds assigned
-- Review the user request carefully to match ContentParts to chapters based on context and purpose
+CONTENT ASSIGNMENT:
+- Assign ContentParts to chapters via contentParts object
+- For data extraction, the type of a contentPart (image, text, etc.) is NOT relevant - only what is specified in the instruction matters
+- Include ALL relevant parts from same source when needed for structured data extraction
+- Each contentPart can have either:
+ - "instruction": For AI extraction prompts (how to process/extract from this part)
+ - "caption": For user-facing presentation (how to display/reference this part in the document)
+ - Both can be present if needed
+- Chapters without contentParts can only generate generic content (not document-specific)
-CRITICAL - CHAPTERS WITHOUT CONTENT PARTS:
-- If contentPartIds is EMPTY, generationHint MUST be VERY DETAILED with all context needed to generate content from scratch
-- Include: what to generate, what information to include, purpose, specific details
-- Without content parts, AI relies ENTIRELY on generationHint and CANNOT analyze document content
+FORMATTING:
+- Formatting is handled automatically - focus on CONTENT and STRUCTURE only
-IMPORTANT - FORMATTING:
-- Formatting (fonts, colors, layouts, styles) is handled AUTOMATICALLY by the renderer
-- Do NOT specify formatting details in generationHint unless it's content-specific (e.g., "pie chart with 3 segments")
-- Focus on CONTENT and STRUCTURE, not visual formatting
-- The renderer will apply appropriate styling based on the output format ({outputFormat})
-
-For each chapter:
-- chapter id
-- level (1, 2, 3, etc.)
-- title
-- contentPartIds: [List of ContentPart IDs] - ASSIGN content based on user request and chapter purpose
-- contentPartInstructions: {{
- "partId": {{
- "instruction": "How content should be structured"
- }}
-}}
-- generationHint: Description of the content (must be self-contained with all necessary context)
- * If contentPartIds is EMPTY, generationHint MUST be VERY DETAILED with all context needed to generate content from scratch
- * Focus on content and structure, NOT formatting details
-
-OUTPUT FORMAT: {outputFormat}
+CHAPTER STRUCTURE:
+- chapter id, level (1, 2, 3, etc.), title
+- contentParts: {{"partId": {{"instruction": "..."}} or {{"caption": "..."}} or both}} - Compact mapping of part IDs to their extraction instructions and/or presentation captions
+- generationHint: Self-contained description (if contentParts is empty, must be VERY DETAILED)
RETURN JSON:
{{
@@ -315,10 +302,16 @@ RETURN JSON:
"id": "chapter_1",
"level": 1,
"title": "Introduction",
- "contentPartIds": ["part_ext_1"],
- "contentPartInstructions": {{
+ "contentParts": {{
"part_ext_1": {{
"instruction": "Use full extracted text"
+ }},
+ "part_img_1": {{
+ "instruction": "Analyze image for additional details"
+ }},
+ "part_img_2": {{
+ "instruction": "Analyze image for additional details",
+ "caption": "Figure 1: Overview diagram"
}}
}},
"generationHint": "Create introduction section",
@@ -328,8 +321,7 @@ RETURN JSON:
"id": "chapter_2",
"level": 1,
"title": "Main Title",
- "contentPartIds": [],
- "contentPartInstructions": {{}},
+ "contentParts": {{}},
"generationHint": "Create [specific content description] with [formatting details]. Include [required information]. Purpose: [explanation of what this chapter provides].",
"sections": []
}}
@@ -337,7 +329,7 @@ RETURN JSON:
}}]
}}
-Return ONLY valid JSON following the structure above.
+OUTPUT FORMAT: Start with {{ and end with }}. Do NOT use markdown code fences (```json). Do NOT add explanatory text before or after the JSON. Return ONLY the JSON object itself.
"""
return prompt
diff --git a/modules/services/serviceExtraction/mainServiceExtraction.py b/modules/services/serviceExtraction/mainServiceExtraction.py
index 618a86e8..7c4649e8 100644
--- a/modules/services/serviceExtraction/mainServiceExtraction.py
+++ b/modules/services/serviceExtraction/mainServiceExtraction.py
@@ -15,6 +15,7 @@ from modules.datamodels.datamodelChat import ChatDocument
from modules.datamodels.datamodelAi import AiCallResponse, AiCallRequest, AiCallOptions, OperationTypeEnum, AiModelCall
from modules.aicore.aicoreModelRegistry import modelRegistry
from modules.aicore.aicoreModelSelector import modelSelector
+from modules.shared.jsonUtils import stripCodeFences
logger = logging.getLogger(__name__)
@@ -164,6 +165,29 @@ class ExtractionService:
if "sourceAction" not in p.metadata:
p.metadata["sourceAction"] = "extraction.extractContent"
+ # Write debug file for each text part extracted (without AI)
+ for j, part in enumerate(ec.parts):
+ if part.typeGroup == "text" and part.data and self.services and hasattr(self.services, 'utils') and hasattr(self.services.utils, 'writeDebugFile'):
+ try:
+ debug_content = {
+ "partIndex": j + 1,
+ "partId": part.id,
+ "typeGroup": part.typeGroup,
+ "mimeType": part.mimeType or "text/plain",
+ "label": part.label,
+ "dataLength": len(part.data),
+ "metadata": part.metadata.copy() if part.metadata else {},
+ "data": part.data # Full extracted text
+ }
+ debug_json = json.dumps(debug_content, indent=2, ensure_ascii=False)
+ # Use document name and part index for filename
+ doc_name_safe = documentData["fileName"].replace(" ", "_").replace("/", "_").replace("\\", "_")[:50]
+ debug_filename = f"extraction_text_part_{j+1}_{doc_name_safe}.txt"
+ self.services.utils.writeDebugFile(debug_json, debug_filename)
+ logger.info(f"Wrote debug file for extracted text part {j+1}/{len(ec.parts)}: {debug_filename}")
+ except Exception as e:
+ logger.warning(f"Failed to write debug file for text part {j+1}: {str(e)}")
+
# Log chunking information
chunkedParts = [p for p in ec.parts if p.metadata.get("chunk", False)]
if chunkedParts:
@@ -263,256 +287,6 @@ class ExtractionService:
return results
- def mergeAiResults(
- self,
- extractedContent: List[ContentExtracted],
- aiResults: List[str],
- strategy: MergeStrategy
- ) -> ContentExtracted:
- """
- Merge AI results from chunked content back into a single ContentExtracted.
-
- Args:
- extractedContent: List of ContentExtracted objects that were processed
- aiResults: List of AI response strings, one per chunk
- strategy: Merge strategy configuration (dict or MergeStrategy object)
-
- Returns:
- Single ContentExtracted with merged AI results
- """
- logger.debug(f"=== MERGING AI RESULTS ===")
- logger.debug(f"Extracted content: {len(extractedContent)} documents")
- logger.debug(f"AI results: {len(aiResults)} responses")
- logger.debug(f"Merge strategy: {strategy.mergeType}")
-
- mergeStrategy = strategy
-
- # Collect all parts from all extracted content
- allParts: List[ContentPart] = []
- for ec in extractedContent:
- allParts.extend(ec.parts)
-
- logger.debug(f"Total original parts: {len(allParts)}")
-
- # Create AI result parts
- aiResultParts: List[ContentPart] = []
- for i, aiResult in enumerate(aiResults):
- aiPart = ContentPart(
- id=f"ai_result_{i}",
- parentId=None, # Will be set based on strategy
- label="ai_result",
- typeGroup="text",
- mimeType="text/plain",
- data=aiResult,
- metadata={
- "aiResult": True,
- "order": i,
- "size": len(aiResult.encode('utf-8'))
- }
- )
- aiResultParts.append(aiPart)
-
- logger.debug(f"Created {len(aiResultParts)} AI result parts")
-
- # Apply merging strategy
- if mergeStrategy.mergeType == "concatenate":
- mergedParts = self._mergeConcatenate(allParts, aiResultParts, mergeStrategy)
- elif mergeStrategy.mergeType == "hierarchical":
- mergedParts = self._mergeHierarchical(allParts, aiResultParts, mergeStrategy)
- elif mergeStrategy.mergeType == "intelligent":
- mergedParts = self._mergeIntelligent(allParts, aiResultParts, mergeStrategy)
- else:
- # Default to concatenate
- mergedParts = self._mergeConcatenate(allParts, aiResultParts, mergeStrategy)
-
- # Create final ContentExtracted
- mergedContent = ContentExtracted(
- id=f"merged_{uuid.uuid4()}",
- parts=mergedParts
- )
-
- logger.debug(f"=== MERGE COMPLETED ===")
- logger.debug(f"Final merged parts: {len(mergedParts)}")
- logger.debug(f"Merged content ID: {mergedContent.id}")
-
- return mergedContent
-
- def _mergeConcatenate(
- self,
- originalParts: List[ContentPart],
- aiResultParts: List[ContentPart],
- strategy: MergeStrategy
- ) -> List[ContentPart]:
- """Merge parts by simple concatenation."""
- mergedParts = []
-
- # Add original parts (filtered if needed)
- for part in originalParts:
- if strategy.preserveChunks or not part.metadata.get("chunk", False):
- mergedParts.append(part)
-
- # Add AI results
- if aiResultParts:
- # Group AI results by parentId if available
- aiResultsByParent = {}
- for aiPart in aiResultParts:
- parentId = aiPart.parentId or "root"
- if parentId not in aiResultsByParent:
- aiResultsByParent[parentId] = []
- aiResultsByParent[parentId].append(aiPart)
-
- # Merge AI results for each parent
- for parentId, aiParts in aiResultsByParent.items():
- if len(aiParts) == 1:
- mergedParts.append(aiParts[0])
- else:
- # Concatenate multiple AI results for same parent
- combinedData = strategy.chunkSeparator.join([p.data for p in aiParts])
- combinedPart = ContentPart(
- id=f"merged_ai_{parentId}",
- parentId=parentId if parentId != "root" else None,
- label="merged_ai_result",
- typeGroup="text",
- mimeType="text/plain",
- data=combinedData,
- metadata={
- "aiResult": True,
- "merged": True,
- "sourceCount": len(aiParts),
- "size": len(combinedData.encode('utf-8'))
- }
- )
- mergedParts.append(combinedPart)
-
- return mergedParts
-
- def _mergeHierarchical(
- self,
- originalParts: List[ContentPart],
- aiResultParts: List[ContentPart],
- strategy: MergeStrategy
- ) -> List[ContentPart]:
- """Merge parts hierarchically based on parentId relationships."""
- # Group parts by parentId
- partsByParent = {}
- for part in originalParts:
- parentId = part.parentId or "root"
- if parentId not in partsByParent:
- partsByParent[parentId] = []
- partsByParent[parentId].append(part)
-
- # Group AI results by parentId
- aiResultsByParent = {}
- for aiPart in aiResultParts:
- parentId = aiPart.parentId or "root"
- if parentId not in aiResultsByParent:
- aiResultsByParent[parentId] = []
- aiResultsByParent[parentId].append(aiPart)
-
- mergedParts = []
-
- # Process each parent group
- for parentId in set(list(partsByParent.keys()) + list(aiResultsByParent.keys())):
- originalGroup = partsByParent.get(parentId, [])
- aiGroup = aiResultsByParent.get(parentId, [])
-
- # Add original parts
- mergedParts.extend(originalGroup)
-
- # Add AI results for this parent
- if aiGroup:
- if len(aiGroup) == 1:
- mergedParts.append(aiGroup[0])
- else:
- # Merge multiple AI results
- combinedData = strategy.chunkSeparator.join([p.data for p in aiGroup])
- combinedPart = ContentPart(
- id=f"hierarchical_ai_{parentId}",
- parentId=parentId if parentId != "root" else None,
- label="hierarchical_ai_result",
- typeGroup="text",
- mimeType="text/plain",
- data=combinedData,
- metadata={
- "aiResult": True,
- "hierarchical": True,
- "sourceCount": len(aiGroup),
- "size": len(combinedData.encode('utf-8'))
- }
- )
- mergedParts.append(combinedPart)
-
- return mergedParts
-
- def _mergeIntelligent(
- self,
- originalParts: List[ContentPart],
- aiResultParts: List[ContentPart],
- strategy: MergeStrategy
- ) -> List[ContentPart]:
- """Merge parts using intelligent strategies based on content type."""
- mergedParts = []
-
- # Group by typeGroup for intelligent merging
- partsByType = {}
- for part in originalParts:
- typeGroup = part.typeGroup
- if typeGroup not in partsByType:
- partsByType[typeGroup] = []
- partsByType[typeGroup].append(part)
-
- # Process each type group
- for typeGroup, parts in partsByType.items():
- if typeGroup == "text":
- mergedParts.extend(self._mergeTextIntelligent(parts, aiResultParts, strategy))
- elif typeGroup == "table":
- mergedParts.extend(self._mergeTableIntelligent(parts, aiResultParts, strategy))
- elif typeGroup == "structure":
- mergedParts.extend(self._mergeStructureIntelligent(parts, aiResultParts, strategy))
- else:
- # Default handling for other types
- mergedParts.extend(parts)
-
- # Add any remaining AI results that weren't merged
- for aiPart in aiResultParts:
- if not any(p.id == aiPart.id for p in mergedParts):
- mergedParts.append(aiPart)
-
- return mergedParts
-
- def _mergeTextIntelligent(
- self,
- textParts: List[ContentPart],
- aiResultParts: List[ContentPart],
- strategy: MergeStrategy
- ) -> List[ContentPart]:
- """Intelligent merging for text content."""
- # For now, use concatenate strategy
- # This could be enhanced with semantic analysis, summarization, etc.
- return self._mergeConcatenate(textParts, aiResultParts, strategy)
-
- def _mergeTableIntelligent(
- self,
- tableParts: List[ContentPart],
- aiResultParts: List[ContentPart],
- strategy: MergeStrategy
- ) -> List[ContentPart]:
- """Intelligent merging for table content."""
- # For now, use concatenate strategy
- # This could be enhanced with table merging logic
- return self._mergeConcatenate(tableParts, aiResultParts, strategy)
-
- def _mergeStructureIntelligent(
- self,
- structureParts: List[ContentPart],
- aiResultParts: List[ContentPart],
- strategy: MergeStrategy
- ) -> List[ContentPart]:
- """Intelligent merging for structured content."""
- # For now, use concatenate strategy
- # This could be enhanced with structure-aware merging
- return self._mergeConcatenate(structureParts, aiResultParts, strategy)
-
async def processDocumentsPerChunk(
self,
documents: List[ChatDocument],
@@ -756,11 +530,15 @@ class ExtractionService:
return processedResults
def _convertToContentParts(
- self, partResults: Union[List[PartResult], List[AiCallResponse]]
+ self, partResults: Union[List[PartResult], List[AiCallResponse]], originalContentParts: Optional[List[ContentPart]] = None
) -> List[ContentPart]:
"""Convert part results to ContentParts (internal helper for consolidation).
Handles both PartResult (from extraction workflow) and AiCallResponse (from content parts processing).
+
+ Args:
+ partResults: List of PartResult or AiCallResponse objects
+ originalContentParts: Optional list of original ContentPart objects to preserve typeGroup and metadata
"""
content_parts = []
@@ -794,14 +572,30 @@ class ExtractionService:
elif isinstance(partResults[0], AiCallResponse):
# Logic from interfaceAiObjects (from content parts processing)
# Phase 7: Add originalIndex for explicit ordering
+ # REQUIRED: originalContentParts must be provided for AiCallResponse path to preserve typeGroup
+ if not originalContentParts:
+ raise ValueError("originalContentParts is required when merging AiCallResponse objects. All callers must provide the original ContentPart objects to preserve typeGroup.")
+
for i, result in enumerate(partResults):
if result.content:
+ # Handle one-to-many relationships (e.g., chunking: 1 contentPart -> N chunkResults)
+ # If we have fewer originalContentParts than partResults, use the first one for all
+ if i < len(originalContentParts):
+ originalPart = originalContentParts[i]
+ else:
+ # One-to-many: use first originalContentPart for remaining results
+ originalPart = originalContentParts[0]
+
+ originalTypeGroup = originalPart.typeGroup or "text"
+ originalMimeType = originalPart.mimeType or "text/plain"
+ originalLabel = originalPart.label or f"ai_result_{i}"
+
content_part = ContentPart(
id=str(uuid.uuid4()),
parentId=None,
- label=f"ai_result_{i}",
- typeGroup="text", # Default to text for AI results
- mimeType="text/plain",
+ label=originalLabel,
+ typeGroup=originalTypeGroup, # Preserve original typeGroup from originalContentParts
+ mimeType=originalMimeType,
data=result.content,
metadata={
"aiResult": True,
@@ -821,17 +615,23 @@ class ExtractionService:
def mergePartResults(
self,
partResults: Union[List[PartResult], List[AiCallResponse]],
- options: Optional[AiCallOptions] = None
+ options: Optional[AiCallOptions] = None,
+ originalContentParts: Optional[List[ContentPart]] = None
) -> str:
"""Unified merge for both PartResult and AiCallResponse.
Consolidated from both interfaceAiObjects.py and existing serviceExtraction method.
+
+ Args:
+ partResults: List of PartResult or AiCallResponse objects to merge
+ options: Optional AiCallOptions for merge strategy
+ originalContentParts: Optional list of original ContentPart objects to preserve typeGroup
"""
if not partResults:
return ""
- # Convert to ContentParts using unified helper
- content_parts = self._convertToContentParts(partResults)
+ # Convert to ContentParts using unified helper, preserving original typeGroup
+ content_parts = self._convertToContentParts(partResults, originalContentParts)
# Determine merge strategy based on input type
if isinstance(partResults[0], PartResult):
@@ -852,7 +652,19 @@ class ExtractionService:
mergeType="concatenate"
)
- # Apply merging
+ # Check if this is a JSON extraction response format (extracted_content structure)
+ # If so, merge JSON structures intelligently before applying regular merging
+ isJsonExtractionResponse = self._isJsonExtractionResponse(content_parts)
+
+ if isJsonExtractionResponse:
+ # Merge JSON extraction responses intelligently
+ logger.info(f"Detected JSON extraction response format - merging {len(content_parts)} JSON responses")
+ merged_json = self._mergeJsonExtractionResponses(content_parts, originalContentParts)
+ merged_json_str = json.dumps(merged_json, indent=2, ensure_ascii=False)
+ logger.info(f"Successfully merged JSON extraction responses into single unified JSON ({len(merged_json_str)} chars)")
+ return merged_json_str
+
+ # Apply regular merging for non-JSON extraction responses
merged_parts = applyMerging(content_parts, merge_strategy)
# Phase 6: Enhanced format with metadata preservation
@@ -897,6 +709,333 @@ class ExtractionService:
logger.info(f"Merged {len(partResults)} parts using unified merging system with metadata preservation (generationResponse={isGenerationResponse})")
return final_content.strip()
+ def _isJsonExtractionResponse(self, content_parts: List[ContentPart]) -> bool:
+ """Check if contentParts contain JSON extraction responses (extracted_content format)."""
+ if not content_parts:
+ return False
+
+ # Check first part to see if it's JSON extraction response format
+ firstPartData = content_parts[0].data if content_parts[0].data else ""
+ if not isinstance(firstPartData, str):
+ return False
+
+ # Strip markdown code fences (```json ... ```) before checking
+ strippedData = stripCodeFences(firstPartData.strip())
+
+ # Check if it starts with JSON object/array
+ if not strippedData.startswith(('{', '[')):
+ return False
+
+ try:
+ parsed = json.loads(strippedData)
+ # Check if it has the extraction response structure: {"extracted_content": {...}}
+ if isinstance(parsed, dict) and "extracted_content" in parsed:
+ return True
+ except:
+ pass
+
+ return False
+
+ def _mergeJsonExtractionResponses(self, content_parts: List[ContentPart], originalContentParts: Optional[List[ContentPart]] = None) -> Dict[str, Any]:
+ """Merge multiple JSON extraction responses into one unified response.
+
+ Merges:
+ - Tables: Combines all table rows, preserves headers, removes duplicates
+ - Text: Combines all text blocks
+ - Headings: Combines all headings arrays
+ - Lists: Combines all list items
+ - Images: Combines all image descriptions
+ """
+ merged = {
+ "extracted_content": {
+ "text": "",
+ "tables": [],
+ "headings": [],
+ "lists": [],
+ "images": []
+ }
+ }
+
+ # Track table headers to merge tables with same structure
+ table_headers_map: Dict[str, List[Dict[str, Any]]] = {} # headers_tuple -> [tables]
+ all_text_parts = []
+ all_headings = []
+ all_lists = []
+ all_images = []
+
+ # Collect per-part extracted data for debug file
+ per_part_extracted_data = []
+ # Track original parts and their extracted data
+ original_parts_extracted_data = []
+
+ for part_idx, part in enumerate(content_parts, 1):
+ logger.info(f"=== Processing ContentPart {part_idx}/{len(content_parts)}: id={part.id}, label={part.label}, typeGroup={part.typeGroup} ===")
+
+ if not part.data:
+ logger.warning(f"ContentPart {part.id} has no data, skipping")
+ continue
+
+ # Handle multiple JSON blocks in a single response (separated by ---)
+ # Split by --- to handle multiple JSON blocks per ContentPart
+ partDataBlocks = part.data.split('---')
+ logger.debug(f"ContentPart {part.id}: Found {len(partDataBlocks)} JSON block(s) (split by ---)")
+
+ for block_idx, blockData in enumerate(partDataBlocks, 1):
+ if not blockData.strip():
+ continue
+
+ try:
+ # Strip markdown code fences before parsing
+ strippedData = stripCodeFences(blockData.strip())
+ if not strippedData:
+ logger.debug(f"ContentPart {part.id}, Block {block_idx}: Empty after stripping code fences")
+ continue
+
+ parsed = json.loads(strippedData)
+ if not isinstance(parsed, dict) or "extracted_content" not in parsed:
+ logger.debug(f"ContentPart {part.id}, Block {block_idx}: Not a valid extraction response format")
+ continue
+
+ extracted = parsed["extracted_content"]
+
+ # Find corresponding original part (if available)
+ original_part = None
+ if originalContentParts and part_idx <= len(originalContentParts):
+ original_part = originalContentParts[part_idx - 1]
+ elif originalContentParts and len(originalContentParts) > 0:
+ # Handle one-to-many (chunking) - use first original part
+ original_part = originalContentParts[0]
+
+ # Store extracted data for this part/block for debug file
+ part_extracted = {
+ "contentPartId": part.id,
+ "contentPartLabel": part.label,
+ "contentPartTypeGroup": part.typeGroup,
+ "blockIndex": block_idx,
+ "extracted_content": extracted.copy() # Store full extracted content
+ }
+ per_part_extracted_data.append(part_extracted)
+
+ # Store original part extracted data
+ if original_part:
+ # Extract text from extracted_content for display
+ extracted_text = extracted.get("text", "") if isinstance(extracted.get("text"), str) else ""
+ if not extracted_text and extracted.get("tables"):
+ # If no text but has tables, create a text representation
+ table_texts = []
+ for table in extracted.get("tables", []):
+ if isinstance(table, dict):
+ headers = table.get("headers", [])
+ rows = table.get("rows", [])
+ if headers and rows:
+ table_texts.append(f"Table: {', '.join(headers)}\nRows: {len(rows)}")
+ extracted_text = "\n".join(table_texts) if table_texts else ""
+
+ original_part_data = {
+ "id": original_part.id,
+ "typeGroup": original_part.typeGroup,
+ "mimeType": original_part.mimeType or "text/plain",
+ "label": original_part.label,
+ "dataLength": len(extracted_text),
+ "metadata": {
+ "documentId": original_part.metadata.get("documentId") if original_part.metadata else None,
+ "documentMimeType": original_part.metadata.get("documentMimeType") if original_part.metadata else None,
+ "originalFileName": original_part.metadata.get("originalFileName") if original_part.metadata else None,
+ },
+ "data": extracted_text, # Full extracted text
+ "extracted_content": extracted.copy() # Full extracted content structure
+ }
+ original_parts_extracted_data.append(original_part_data)
+
+ # Log extracted content summary
+ extracted_summary = {
+ "text": len(extracted.get("text", "")) if extracted.get("text") else 0,
+ "tables": len(extracted.get("tables", [])) if isinstance(extracted.get("tables"), list) else 0,
+ "headings": len(extracted.get("headings", [])) if isinstance(extracted.get("headings"), list) else 0,
+ "lists": len(extracted.get("lists", [])) if isinstance(extracted.get("lists"), list) else 0,
+ "images": len(extracted.get("images", [])) if isinstance(extracted.get("images"), list) else 0,
+ }
+ logger.info(f"ContentPart {part.id}, Block {block_idx} extracted: text={extracted_summary['text']} chars, tables={extracted_summary['tables']}, headings={extracted_summary['headings']}, lists={extracted_summary['lists']}, images={extracted_summary['images']}")
+
+ # Log table details
+ if extracted_summary['tables'] > 0:
+ for table_idx, table in enumerate(extracted.get("tables", []), 1):
+ if isinstance(table, dict):
+ headers = table.get("headers", [])
+ rows = table.get("rows", [])
+ logger.info(f" Table {table_idx}: headers={headers}, rows={len(rows) if isinstance(rows, list) else 0}")
+
+ # Log list details
+ if extracted_summary['lists'] > 0:
+ for list_idx, list_item in enumerate(extracted.get("lists", []), 1):
+ if isinstance(list_item, dict):
+ list_type = list_item.get("type", "unknown")
+ items = list_item.get("items", [])
+ logger.info(f" List {list_idx}: type={list_type}, items={len(items) if isinstance(items, list) else 0}")
+
+ # Merge text
+ if "text" in extracted and extracted["text"]:
+ text_content = extracted["text"].strip()
+ if text_content:
+ all_text_parts.append(text_content)
+
+ # Merge tables - group by headers to merge compatible tables
+ if "tables" in extracted and isinstance(extracted["tables"], list):
+ for table in extracted["tables"]:
+ if not isinstance(table, dict) or "headers" not in table or "rows" not in table:
+ continue
+
+ headers = table["headers"]
+ rows = table["rows"]
+
+ if not headers or not rows:
+ continue
+
+ # Use headers as key for grouping
+ headers_key = tuple(headers)
+ if headers_key not in table_headers_map:
+ table_headers_map[headers_key] = []
+ table_headers_map[headers_key].append(table)
+
+ # Merge headings
+ if "headings" in extracted and isinstance(extracted["headings"], list):
+ for heading in extracted["headings"]:
+ if isinstance(heading, dict) and "text" in heading:
+ all_headings.append(heading)
+
+ # Merge lists
+ if "lists" in extracted and isinstance(extracted["lists"], list):
+ for list_item in extracted["lists"]:
+ if isinstance(list_item, dict) and "items" in list_item:
+ all_lists.append(list_item)
+
+ # Merge images
+ if "images" in extracted and isinstance(extracted["images"], list):
+ for image in extracted["images"]:
+ if isinstance(image, dict) and "description" in image:
+ all_images.append(image)
+
+ except Exception as e:
+ logger.warning(f"Failed to parse JSON extraction response block from part {part.id}: {str(e)}")
+ continue
+
+ # Combine text parts
+ if all_text_parts:
+ merged["extracted_content"]["text"] = "\n\n".join(all_text_parts)
+
+ # Merge tables by headers - combine rows from tables with same headers
+ for headers_key, tables in table_headers_map.items():
+ # Collect all rows from tables with same headers
+ all_rows = []
+ seen_rows = set() # Track duplicates using tuple representation
+
+ for table in tables:
+ rows = table.get("rows", [])
+ for row in rows:
+ # Convert row to tuple for duplicate detection
+ row_tuple = tuple(str(cell) for cell in row)
+ if row_tuple not in seen_rows:
+ seen_rows.add(row_tuple)
+ all_rows.append(row)
+
+ # Create merged table
+ if all_rows:
+ merged["extracted_content"]["tables"].append({
+ "headers": list(headers_key),
+ "rows": all_rows
+ })
+
+ # Add headings
+ if all_headings:
+ merged["extracted_content"]["headings"] = all_headings
+
+ # Add lists - keep them separate (like headings) to preserve document structure
+ if all_lists:
+ merged["extracted_content"]["lists"] = all_lists
+
+ # Add images
+ if all_images:
+ merged["extracted_content"]["images"] = all_images
+
+ logger.info(f"=== Merging Summary ===")
+ logger.info(f"Total ContentParts processed: {len(content_parts)}")
+ logger.info(f"Text parts collected: {len(all_text_parts)}")
+ logger.info(f"Table groups (by headers): {len(table_headers_map)}")
+ logger.info(f"Headings collected: {len(all_headings)}")
+ logger.info(f"Lists collected: {len(all_lists)}")
+ logger.info(f"Images collected: {len(all_images)}")
+
+ # Log table merging details
+ for headers_key, tables in table_headers_map.items():
+ total_rows = sum(len(table.get("rows", [])) for table in tables)
+ logger.info(f" Table group with headers {list(headers_key)}: {len(tables)} table(s), {total_rows} total rows")
+
+ logger.info(f"Merged JSON extraction responses: {len(table_headers_map)} table groups, {len(all_text_parts)} text parts, {len(all_headings)} headings, {len(all_lists)} lists, {len(all_images)} images")
+
+ # Write per-part extracted data to debug file
+ if per_part_extracted_data and self.services and hasattr(self.services, 'utils') and hasattr(self.services.utils, 'writeDebugFile'):
+ try:
+ debug_content = {
+ "summary": {
+ "totalContentParts": len(content_parts),
+ "totalExtractedBlocks": len(per_part_extracted_data),
+ "mergedResult": {
+ "textParts": len(all_text_parts),
+ "tableGroups": len(table_headers_map),
+ "headings": len(all_headings),
+ "lists": len(all_lists),
+ "images": len(all_images)
+ }
+ },
+ "perPartExtractedData": per_part_extracted_data
+ }
+ debug_json = json.dumps(debug_content, indent=2, ensure_ascii=False)
+ self.services.utils.writeDebugFile(debug_json, "content_extraction_per_part")
+ logger.info(f"Wrote per-part extracted data to debug file: {len(per_part_extracted_data)} blocks from {len(content_parts)} content parts")
+ except Exception as e:
+ logger.warning(f"Failed to write per-part extracted data to debug file: {str(e)}")
+
+ # Write original parts extracted data in extraction_result format
+ if original_parts_extracted_data and self.services and hasattr(self.services, 'utils') and hasattr(self.services.utils, 'writeDebugFile'):
+ try:
+ # Get document info from first original part if available
+ document_name = None
+ document_mime_type = None
+ if originalContentParts and len(originalContentParts) > 0:
+ first_part = originalContentParts[0]
+ if first_part.metadata:
+ document_name = first_part.metadata.get("originalFileName")
+ document_mime_type = first_part.metadata.get("documentMimeType")
+
+ # Format similar to extraction_result file
+ extraction_result_format = {
+ "documentName": document_name or "Unknown",
+ "documentMimeType": document_mime_type or "application/octet-stream",
+ "partsCount": len(original_parts_extracted_data),
+ "parts": []
+ }
+
+ for part_data in original_parts_extracted_data:
+ # Format each part similar to extraction_result format
+ formatted_part = {
+ "typeGroup": part_data["typeGroup"],
+ "mimeType": part_data["mimeType"],
+ "label": part_data["label"],
+ "dataLength": part_data["dataLength"],
+ "metadata": part_data["metadata"],
+ "data": part_data["data"], # Full extracted text
+ "extracted_content": part_data["extracted_content"] # Full structure
+ }
+ extraction_result_format["parts"].append(formatted_part)
+
+ result_json = json.dumps(extraction_result_format, indent=2, ensure_ascii=False)
+ self.services.utils.writeDebugFile(result_json, "content_extraction_original_parts")
+ logger.info(f"Wrote original parts extracted data to debug file: {len(original_parts_extracted_data)} original parts")
+ except Exception as e:
+ logger.warning(f"Failed to write original parts extracted data to debug file: {str(e)}")
+
+ return merged
+
async def chunkContentPartForAi(self, contentPart, model, options, prompt: str = "") -> List[Dict[str, Any]]:
"""Chunk a content part based on model capabilities, accounting for prompt, system message overhead, and maxTokens output.
@@ -1162,7 +1301,8 @@ class ExtractionService:
if not chunkResults:
raise ValueError(f"All chunks failed for content part")
- mergedContent = self.mergePartResults(chunkResults, options)
+ # Pass original contentPart to preserve typeGroup for all chunks (one-to-many: 1 part -> N chunks)
+ mergedContent = self.mergePartResults(chunkResults, options, [contentPart])
return AiCallResponse(
content=mergedContent,
modelName=model.name,
@@ -1208,7 +1348,8 @@ class ExtractionService:
raise
# Merge chunk results using unified mergePartResults
- mergedContent = self.mergePartResults(chunkResults, options)
+ # Pass original contentPart to preserve typeGroup for all chunks (one-to-many: 1 part -> N chunks)
+ mergedContent = self.mergePartResults(chunkResults, options, [contentPart])
logger.info(f"✅ Content part chunked and processed with model: {model.name} ({len(chunks)} chunks)")
return AiCallResponse(
@@ -1258,6 +1399,9 @@ class ExtractionService:
Moved from interfaceAiObjects.callWithContentParts() - entry point for content parts processing.
Uses parallel processing similar to section generation for better performance.
+
+ SPECIAL CASE: For DATA_EXTRACT operations, processes all contentParts together in ONE call
+ to enable proper merging (e.g., merging tables from multiple PDFs into one table).
"""
prompt = request.prompt
options = request.options
@@ -1274,6 +1418,12 @@ class ExtractionService:
if totalParts == 0:
return self._createErrorResponse("No content parts to process", 0, 0)
+ # NOTE: For DATA_EXTRACT operations, the extraction prompt explicitly asks the AI to merge
+ # all contentParts into ONE unified JSON response. Even though we process parts separately,
+ # each response should contain merged content. The mergePartResults will concatenate responses,
+ # but the new prompt format (flat extracted_content structure) is designed for easier merging.
+
+ # DEFAULT: Process parts in parallel
# Thread-safe counter for progress tracking
completedCount = [0] # Use list to allow modification in nested function
@@ -1330,8 +1480,8 @@ class ExtractionService:
elif result is not None:
allResults.append(result)
- # Merge all results using unified mergePartResults
- mergedContent = self.mergePartResults(allResults)
+ # Merge all results using unified mergePartResults, preserving original typeGroup
+ mergedContent = self.mergePartResults(allResults, options, contentParts)
return AiCallResponse(
content=mergedContent,
diff --git a/modules/services/serviceExtraction/subPromptBuilderExtraction.py b/modules/services/serviceExtraction/subPromptBuilderExtraction.py
index 7e065e22..b24bed13 100644
--- a/modules/services/serviceExtraction/subPromptBuilderExtraction.py
+++ b/modules/services/serviceExtraction/subPromptBuilderExtraction.py
@@ -45,58 +45,51 @@ async def buildExtractionPrompt(
Complete extraction prompt string
"""
- # Unified multi-file example (single doc = multi with n=1)
+ # Flat extraction format - returns extracted content as structured data, not documents/sections
+ # This format allows merging multiple contentParts into one response
json_example = {
- "metadata": {
- "title": "Multi-Document Example",
- "split_strategy": "by_section",
- "source_documents": ["doc_001"],
- "extraction_method": "ai_extraction"
- },
- "documents": [
- {
- "id": "doc_section_1",
- "title": "Section 1 Title",
- "filename": "section_1.xlsx",
- "sections": [
- {
- "id": "section_1",
- "content_type": "heading",
- "elements": [
- {
- "level": 1,
- "text": "1. SECTION TITLE"
- }
- ],
- "order": 1
- },
- {
- "id": "section_2",
- "content_type": "paragraph",
- "elements": [
- {
- "text": "This is the actual content that should be extracted from the document."
- }
- ],
- "order": 2
- },
- {
- "id": "section_3",
- "content_type": "table",
- "elements": [
- {
- "headers": ["Column 1", "Column 2"],
- "rows": [["Value 1", "Value 2"]]
- }
- ],
- "order": 3
- }
- ]
- }
- ]
+ "extracted_content": {
+ "text": "Extracted text content from the document...",
+ "tables": [
+ {
+ "headers": ["Column 1", "Column 2"],
+ "rows": [
+ ["Value 1", "Value 2"],
+ ["Value 3", "Value 4"]
+ ]
+ }
+ ],
+ "headings": [
+ {
+ "level": 1,
+ "text": "Main Heading"
+ },
+ {
+ "level": 2,
+ "text": "Subheading"
+ }
+ ],
+ "lists": [
+ {
+ "type": "bullet",
+ "items": ["Item 1", "Item 2", "Item 3"]
+ }
+ ],
+ "images": [
+ {
+ "description": "Description of image content, including all visible text, tables, and visual elements"
+ }
+ ]
+ }
}
- structure_instruction = "CRITICAL: You MUST return a JSON structure with a \"documents\" array. For single documents, create one document entry with all sections."
+ structure_instruction = """CRITICAL EXTRACTION REQUIREMENTS:
+1. Extract content from the provided ContentPart(s) - process what is provided in this call
+2. If this ContentPart contains tables, extract them with proper structure (headers and rows)
+3. If this ContentPart contains text, extract it as structured text
+4. Return ONE JSON object with extracted content from this ContentPart
+5. Preserve all original data - do not summarize or interpret
+6. The system will merge results from multiple ContentParts automatically - focus on extracting this ContentPart's content accurately"""
# Parse extraction intent if AI service is available
extraction_intent = await _parseExtractionIntent(userPrompt, outputFormat, aiService, services) if aiService else userPrompt
@@ -124,30 +117,25 @@ USER REQUEST / USER PROMPT:
END OF USER REQUEST / USER PROMPT
{'='*80}
-You are a document processing assistant that extracts and structures content from documents. Your task is to analyze the provided document content and create a structured JSON output.
+You are a document processing assistant that extracts content from documents. Your task is to analyze the provided ContentPart(s) and extract their content into a structured JSON format.
-TASK: Extract the actual content from the document and organize it into documents. For single documents, create one document entry. For multi-document requests, create multiple document entries.
+TASK: Extract content from the provided ContentPart(s). Extract all tables, text, headings, lists, and other content types accurately. The system processes ContentParts individually and merges results automatically.
LANGUAGE REQUIREMENT: All extracted content must be in the language '{userLanguage}'. Extract and preserve content in this language.
{extraction_intent}
-REQUIREMENTS:
-1. Analyze the document content provided in the context below
-2. Identify distinct sections in the document (by headings, topics, or logical breaks)
-3. Create one or more JSON document entries based on the content structure
-4. Extract the real content from each section (headings, paragraphs, lists, etc.)
-5. Generate appropriate filenames for each document
-
{structure_instruction}
OUTPUT FORMAT: Return only valid JSON in this exact structure:
{json.dumps(json_example, indent=2)}
-Requirements:
+CRITICAL EXTRACTION RULES:
+- Extract only content that is ACTUALLY PRESENT in the ContentPart - never create fake or placeholder data
+- Return empty arrays [] or empty strings "" when content is missing - this is normal and expected
+- Extract all tables, text, headings, lists accurately with proper structure
- Preserve all original data - do not summarize or interpret
-- Use the exact JSON format shown above
-- Maintain data integrity and structure
+- Return ONE JSON object per ContentPart (the system merges multiple ContentParts automatically)
Content Types to Extract:
1. Tables: Extract all rows and columns with proper headers
@@ -166,7 +154,7 @@ Image Analysis Requirements:
Return only the JSON structure with actual data from the documents. Do not include any text before or after the JSON.
-Extract the ACTUAL CONTENT from the source documents. Do not use placeholder text like "Section 1", "Section 2", etc. Extract the real headings, paragraphs, and content from the documents.
+Extract only actual content from the ContentPart. Return empty arrays/strings when content is missing - never create fake data.
""".strip()
# Add renderer-specific guidelines if provided
diff --git a/modules/services/serviceGeneration/paths/ARCHITECTURE_ANALYSIS.md b/modules/services/serviceGeneration/paths/ARCHITECTURE_ANALYSIS.md
new file mode 100644
index 00000000..5ba586a7
--- /dev/null
+++ b/modules/services/serviceGeneration/paths/ARCHITECTURE_ANALYSIS.md
@@ -0,0 +1,114 @@
+# Document Generation Architecture Analysis
+
+## Current Flow
+
+### 1. Document Input → ContentParts (`extractAndPrepareContent`)
+
+**Location**: `gateway/modules/services/serviceAi/subContentExtraction.py`
+
+**Flow**:
+- Regular documents → Calls `extractContent()` (NON-AI extraction) → Creates contentParts with raw extracted text
+- **BUT THEN**:
+ - Images with "extract" intent → Calls Vision AI (line 190) → AI extraction
+ - Text with "extract" intent + extractionPrompt → Calls AI processing (line 265) → AI extraction
+- Pre-extracted JSON → Uses contentParts directly (no AI)
+
+**Result**: ContentParts may already be AI-processed before structure generation
+
+### 2. Structure Generation
+
+**Location**: `gateway/modules/services/serviceAi/subStructureGeneration.py`
+
+**Flow**:
+- Uses contentParts (may already be AI-processed)
+- Generates document structure (chapters, sections)
+
+### 3. Section Generation (`_processSingleSection`)
+
+**Location**: `gateway/modules/services/serviceAi/subStructureFilling.py`
+
+**Flow**:
+- Uses contentParts (which may already be AI-processed)
+- Aggregates "extracted" contentParts with AI (line 554-682)
+- Generates section content using `callAiWithLooping` with `useCaseId="section_content"`
+
+## Issues Identified
+
+### Issue 1: Duplicate AI Processing
+- AI extraction happens in `extractAndPrepareContent` (for images/text)
+- AI generation happens again in section generation
+- This is redundant and inefficient
+
+### Issue 2: Architecture Inconsistency
+- Pre-extracted JSON files → contentParts directly (no AI)
+- Regular documents → contentParts + AI extraction (inconsistent)
+- User wants: Documents → contentParts (like pre-extracted JSON) → AI only in section generation
+
+### Issue 3: Image Processing
+- Images need Vision AI to extract text
+- Currently happens in `extractAndPrepareContent`
+- Question: Should this happen during section generation instead?
+
+## Proposed Architecture
+
+### Option A: Remove All AI from `extractAndPrepareContent`
+- Documents → `extractContent()` → Raw contentParts (text, tables, etc.)
+- Images → Keep as image contentParts (no Vision AI extraction)
+- Section generation → Handle images with Vision AI when needed
+
+**Pros**:
+- Consistent with pre-extracted JSON flow
+- Single point of AI processing (section generation)
+- Clear separation of concerns
+
+**Cons**:
+- Images won't have extracted text until section generation
+- May need to handle images differently in section generation
+
+### Option B: Keep Vision AI for Images Only
+- Documents → `extractContent()` → Raw contentParts
+- Images → Vision AI extraction → Text contentParts
+- Section generation → Uses text contentParts (no additional AI extraction)
+
+**Pros**:
+- Images get text extracted early
+- Section generation can use text directly
+
+**Cons**:
+- Still has AI extraction before structure generation
+- Inconsistent with user's request
+
+## Recommendation
+
+**Follow Option A** - Remove all AI extraction from `extractAndPrepareContent`:
+
+1. **Documents → ContentParts** (like pre-extracted JSON):
+ - Call `extractContent()` (NON-AI)
+ - Create contentParts with raw extracted content
+ - Images remain as image contentParts (no Vision AI)
+
+2. **Section Generation**:
+ - Handle images with Vision AI when needed
+ - Aggregate all contentParts with AI
+ - Single point of AI processing
+
+**Benefits**:
+- Clear architecture: Documents = raw contentParts
+- Consistent with pre-extracted JSON flow
+- AI processing only where needed (section generation)
+- Easier to understand and maintain
+
+## Questions to Resolve
+
+1. **Image handling**: How should images be processed during section generation?
+ - Option 1: Vision AI extraction happens automatically when image contentParts are used
+ - Option 2: Images are passed to AI with Vision models during section generation
+ - Option 3: Images remain as binary and are rendered directly (no text extraction)
+
+2. **Text with extractionPrompt**: Should text contentParts with extractionPrompt be processed differently?
+ - Currently: AI processing in `extractAndPrepareContent`
+ - Proposed: Raw text → AI processing during section generation
+
+3. **Performance**: Will deferring image extraction to section generation cause performance issues?
+ - Need to test with multiple images
+
diff --git a/modules/services/serviceGeneration/paths/ARCHITECTURE_CHANGES.md b/modules/services/serviceGeneration/paths/ARCHITECTURE_CHANGES.md
new file mode 100644
index 00000000..3af38ef4
--- /dev/null
+++ b/modules/services/serviceGeneration/paths/ARCHITECTURE_CHANGES.md
@@ -0,0 +1,77 @@
+# Architecture Changes Summary
+
+## Problem Identified
+
+The architecture had AI extraction happening in TWO places:
+1. **`extractAndPrepareContent`**: Vision AI for images, AI processing for text with extractionPrompt
+2. **Section generation**: AI aggregation of contentParts
+
+This was:
+- Redundant (double AI processing)
+- Inconsistent (pre-extracted JSON had no AI, regular documents had AI)
+- Against the desired architecture (documents should become contentParts like pre-extracted JSON)
+
+## Solution Implemented
+
+### 1. Removed AI Extraction from `extractAndPrepareContent`
+
+**File**: `gateway/modules/services/serviceAi/subContentExtraction.py`
+
+**Changes**:
+- **Removed**: Vision AI extraction for images (lines 186-246)
+- **Removed**: AI text processing with extractionPrompt (lines 260-334)
+- **Updated**: Images with extract intent are now marked with `needsVisionExtraction=True` flag
+- **Updated**: Regular documents mark images with `needsVisionExtraction=True` when extract intent is present
+
+**Result**: Documents → contentParts (raw extraction only, no AI)
+
+### 2. Added Vision AI Extraction in Section Generation
+
+**File**: `gateway/modules/services/serviceAi/subStructureFilling.py`
+
+**Changes**:
+- **Added**: Vision AI extraction logic before aggregation (lines 553-610)
+- **Added**: Vision AI extraction logic for single-part processing (lines 1074-1115)
+- **Logic**:
+ - Checks if `part.typeGroup == "image"` AND `needsVisionExtraction == True` AND `intent == "extract"`
+ - Extracts text using Vision AI (`IMAGE_ANALYSE` operation)
+ - Replaces image part with text part for further processing
+ - Images with `contentFormat == "object"` (render intent) are rendered directly (no extraction)
+
+**Result**: AI extraction happens ONLY during section generation
+
+## Architecture Flow (After Changes)
+
+### Document Input → ContentParts
+1. **Regular documents**: `extractContent()` (NON-AI) → Raw contentParts
+ - Images with extract intent: `contentFormat="extracted"`, `needsVisionExtraction=True`
+ - Images with render intent: `contentFormat="object"` (rendered directly)
+ - Text: `contentFormat="extracted"` (raw text, no AI processing)
+
+2. **Pre-extracted JSON**: Direct contentParts (no changes)
+
+### Section Generation → AI Processing
+1. **Images with extract intent**: Vision AI extraction → Text part → AI aggregation
+2. **Images with render intent**: Rendered directly (no extraction)
+3. **Text contentParts**: AI aggregation with extractionPrompt (if provided)
+
+## Key Benefits
+
+1. **Consistent Architecture**: Documents = raw contentParts (like pre-extracted JSON)
+2. **Single Point of AI Processing**: Only in section generation
+3. **Clear Separation**: Extraction vs Generation
+4. **Intent-Based Logic**:
+ - `intent == "extract"` → Vision AI extraction during section generation
+ - `intent == "render"` → Direct rendering (no extraction)
+ - `contentFormat == "object"` → Embedded/referenced images (no extraction)
+
+## Testing Checklist
+
+- [ ] Regular documents create contentParts without AI extraction
+- [ ] Images with extract intent are marked with `needsVisionExtraction=True`
+- [ ] Images with render intent are marked with `contentFormat="object"`
+- [ ] Section generation extracts images with Vision AI when needed
+- [ ] Section generation renders images with object format directly
+- [ ] Text contentParts are processed with AI during section generation
+- [ ] Pre-extracted JSON flow still works correctly
+
diff --git a/modules/services/serviceGeneration/paths/documentPath.py b/modules/services/serviceGeneration/paths/documentPath.py
index d03c82a0..cb2caae3 100644
--- a/modules/services/serviceGeneration/paths/documentPath.py
+++ b/modules/services/serviceGeneration/paths/documentPath.py
@@ -86,105 +86,10 @@ class DocumentGenerationPath:
contentParts = preparedContentParts
- # Schritt 5B.5: Process contentParts with AI extraction (if provided)
- # This extracts text from images, processes content, and updates contentParts with extracted data
- # This matches the original flow: extract content first (no AI), then process with AI
+ # Schritt 5B.5: Documents are converted to contentParts (like pre-processed JSON files)
+ # No AI extraction here - AI extraction happens during section generation
if contentParts:
- # Filter out binary/other parts that shouldn't be processed
- processableParts = []
- skippedParts = []
- for p in contentParts:
- if p.typeGroup in ["image", "text", "table", "structure"] or (p.mimeType and (p.mimeType.startswith("image/") or p.mimeType.startswith("text/"))):
- processableParts.append(p)
- else:
- skippedParts.append(p)
-
- if skippedParts:
- logger.debug(f"Skipping {len(skippedParts)} binary/other parts from document generation")
-
- if processableParts:
- # Count images for progress update
- imageCount = len([p for p in processableParts if p.typeGroup == "image" or (p.mimeType and p.mimeType.startswith("image/"))])
- if imageCount > 0:
- self.services.chat.progressLogUpdate(docOperationId, 0.25, f"Extracting data from {imageCount} images using vision models")
-
- # Build proper extraction prompt using buildExtractionPrompt
- # This creates a focused extraction prompt, not the user's generation prompt
- from modules.services.serviceExtraction.subPromptBuilderExtraction import buildExtractionPrompt
- from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, OperationTypeEnum
-
- # Determine renderer for format-specific guidelines
- renderer = None
- if outputFormat:
- try:
- from modules.services.serviceGeneration.mainServiceGeneration import GenerationService
- generationService = GenerationService(self.services)
- renderer = generationService.getRendererForFormat(outputFormat)
- except Exception as e:
- logger.debug(f"Could not get renderer for format {outputFormat}: {e}")
-
- extractionPrompt = await buildExtractionPrompt(
- outputFormat=outputFormat or "txt",
- userPrompt=userPrompt, # User's prompt as context for what to extract
- title=title or "Document",
- aiService=self.services.ai if hasattr(self.services.ai, 'aiObjects') and self.services.ai.aiObjects else None,
- services=self.services,
- renderer=renderer
- )
-
- logger.info(f"Processing {len(processableParts)} content parts ({imageCount} images) with extraction prompt")
-
- # Update progress - starting extraction
- self.services.chat.progressLogUpdate(docOperationId, 0.26, f"Starting AI extraction from {len(processableParts)} content parts")
-
- # Use DATA_EXTRACT operation type for extraction
- extractionOptions = AiCallOptions(
- operationType=OperationTypeEnum.DATA_EXTRACT, # Use DATA_EXTRACT for extraction
- compressPrompt=False,
- compressContext=False
- )
-
- # Create progress callback for per-part progress updates
- def extractionProgressCallback(progress: float, message: str):
- """Progress callback for extraction - updates parent operation."""
- # Map progress from 0.0-1.0 to 0.26-0.35 range (extraction phase)
- mappedProgress = 0.26 + (progress * 0.09) # 0.26 to 0.35
- self.services.chat.progressLogUpdate(docOperationId, mappedProgress, message)
-
- extractionRequest = AiCallRequest(
- prompt=extractionPrompt, # Use proper extraction prompt, not user's generation prompt
- context="",
- options=extractionOptions,
- contentParts=processableParts
- )
-
- # Write debug file for extraction prompt (all parts)
- self.services.utils.writeDebugFile(extractionPrompt, "content_extraction_prompt")
-
- # Call AI to extract content from contentParts (with progress callback)
- extractionResponse = await self.services.ai.callAi(extractionRequest, progressCallback=extractionProgressCallback)
-
- # Update progress - extraction completed
- self.services.chat.progressLogUpdate(docOperationId, 0.35, f"Completed AI extraction from {len(processableParts)} content parts")
-
- # Write debug file for extraction response
- if extractionResponse.content:
- self.services.utils.writeDebugFile(extractionResponse.content, "content_extraction_response")
- else:
- self.services.utils.writeDebugFile(f"Error: No content returned (errorCount={extractionResponse.errorCount})", "content_extraction_response")
- logger.warning(f"Content extraction returned no content (errorCount={extractionResponse.errorCount})")
-
- # Update contentParts with extracted content (matching original flow)
- if extractionResponse.errorCount == 0 and extractionResponse.content:
- # The extracted content is already merged - update the first processable part with it
- # This matches the original behavior where extracted text was used for generation
- if processableParts:
- # Store extracted content in metadata for use in structure generation
- processableParts[0].metadata["extractedContent"] = extractionResponse.content
- logger.info(f"Successfully extracted content from {len(processableParts)} parts ({len(extractionResponse.content)} chars)")
- else:
- # Extraction failed - log warning but continue
- logger.warning(f"Content extraction failed, continuing with original contentParts")
+ logger.info(f"Using {len(contentParts)} content parts for generation (no AI extraction at this stage)")
# Schritt 5C: Generiere Struktur
structure = await self.services.ai.generateStructure(
diff --git a/modules/services/serviceGeneration/renderers/rendererXlsx.py b/modules/services/serviceGeneration/renderers/rendererXlsx.py
index 24c620d2..d0074394 100644
--- a/modules/services/serviceGeneration/renderers/rendererXlsx.py
+++ b/modules/services/serviceGeneration/renderers/rendererXlsx.py
@@ -9,7 +9,12 @@ from modules.datamodels.datamodelDocument import RenderedDocument
from typing import Dict, Any, List
import io
import base64
-from datetime import datetime, UTC
+from datetime import datetime, UTC, date
+try:
+ from dateutil import parser as date_parser
+ DATEUTIL_AVAILABLE = True
+except ImportError:
+ DATEUTIL_AVAILABLE = False
try:
from openpyxl import Workbook
@@ -976,8 +981,49 @@ class RendererXlsx(BaseRenderer):
self.logger.warning(f"Could not add section to sheet: {str(e)}")
return startRow + 1
+ def _parseDateString(self, text: str) -> Any:
+ """Try to parse a string as a date/datetime. Returns datetime object if successful, None otherwise."""
+ if not text or not isinstance(text, str):
+ return None
+
+ text = text.strip()
+ if not text:
+ return None
+
+ # Common date formats to try (in order of likelihood)
+ date_formats = [
+ "%Y-%m-%d", # 2025-01-01
+ "%d.%m.%Y", # 01.01.2025
+ "%d/%m/%Y", # 01/01/2025
+ "%m/%d/%Y", # 01/01/2025 (US format)
+ "%Y-%m-%d %H:%M:%S", # 2025-01-01 12:00:00
+ "%d.%m.%Y %H:%M:%S", # 01.01.2025 12:00:00
+ "%d/%m/%Y %H:%M:%S", # 01/01/2025 12:00:00
+ "%Y-%m-%d %H:%M", # 2025-01-01 12:00
+ "%d.%m.%Y %H:%M", # 01.01.2025 12:00
+ "%d/%m/%Y %H:%M", # 01/01/2025 12:00
+ ]
+
+ # Try parsing with common formats first
+ for date_format in date_formats:
+ try:
+ parsed_date = datetime.strptime(text, date_format)
+ return parsed_date
+ except ValueError:
+ continue
+
+ # If dateutil is available, use it for more flexible parsing
+ if DATEUTIL_AVAILABLE:
+ try:
+ parsed_date = date_parser.parse(text, dayfirst=True, yearfirst=False)
+ return parsed_date
+ except (ValueError, TypeError):
+ pass
+
+ return None
+
def _sanitizeCellValue(self, value: Any) -> Any:
- """Sanitize cell value: remove markdown, convert to string, handle None, limit length."""
+ """Sanitize cell value: remove markdown, convert to string, handle None, limit length. Preserve numbers as numbers."""
if value is None:
return ""
if isinstance(value, dict):
@@ -994,6 +1040,45 @@ class RendererXlsx(BaseRenderer):
# Remove other markdown
text = text.replace("__", "").replace("_", "")
text = text.strip()
+
+ # Try to convert numeric strings to actual numbers
+ # This ensures Excel treats them as numbers, not strings
+ if text:
+ # Clean text for number conversion: remove common formatting characters
+ # but preserve the original for fallback
+ cleaned_for_number = text.replace("'", "").replace(",", "").replace(" ", "").strip()
+
+ # Only attempt conversion if cleaned text looks like a number
+ # (starts with digit, +, -, or . followed by digit)
+ if cleaned_for_number and (cleaned_for_number[0].isdigit() or cleaned_for_number[0] in '+-.'):
+ # Try integer first (more restrictive)
+ try:
+ # Check if it's a valid integer (no decimal point, no scientific notation)
+ if '.' not in cleaned_for_number and 'e' not in cleaned_for_number.lower() and 'E' not in cleaned_for_number:
+ int_value = int(cleaned_for_number)
+ return int_value
+ except (ValueError, OverflowError):
+ pass
+
+ # Try float if integer conversion failed
+ try:
+ float_value = float(cleaned_for_number)
+ # Only return as float if it's actually a number representation
+ # Avoid converting things like "NaN", "inf" which are valid floats but not useful
+ if cleaned_for_number.lower() not in ['nan', 'inf', '-inf', 'infinity', '-infinity']:
+ # Check for reasonable float values (not too large/small)
+ if abs(float_value) < 1e308: # Avoid overflow
+ return float_value
+ except (ValueError, OverflowError):
+ pass
+
+ # Try to convert date strings to datetime objects
+ # This ensures Excel treats them as dates, not strings
+ # Use original text (not cleaned) for date parsing
+ date_value = self._parseDateString(text)
+ if date_value is not None:
+ return date_value
+
# Excel cell value limit is 32,767 characters - truncate if necessary
if len(text) > 32767:
text = text[:32764] + "..."
diff --git a/modules/services/serviceWeb/mainServiceWeb.py b/modules/services/serviceWeb/mainServiceWeb.py
index 50f7a84c..469ca6ae 100644
--- a/modules/services/serviceWeb/mainServiceWeb.py
+++ b/modules/services/serviceWeb/mainServiceWeb.py
@@ -2,7 +2,7 @@
# All rights reserved.
"""
Web crawl service for handling web research operations.
-Manages the two-step process: WEB_SEARCH then WEB_CRAWL.
+Manages the two-step process: WEB_SEARCH_DATA then WEB_CRAWL.
"""
import json
@@ -35,7 +35,7 @@ class WebService:
"""
Perform web research in two steps:
1. Use AI to analyze prompt and extract parameters + URLs
- 2. Call WEB_SEARCH to get URLs (if needed)
+ 2. Call WEB_SEARCH_DATA to get URLs (if needed)
3. Combine URLs and filter to maxNumberPages
4. Call WEB_CRAWL for each URL
5. Return consolidated result
@@ -337,9 +337,9 @@ Return ONLY valid JSON, no additional text:
# Debug: persist search prompt
self.services.utils.writeDebugFile(searchPrompt, "websearch_prompt")
- # Call AI with WEB_SEARCH operation
+ # Call AI with WEB_SEARCH_DATA operation
searchOptions = AiCallOptions(
- operationType=OperationTypeEnum.WEB_SEARCH,
+ operationType=OperationTypeEnum.WEB_SEARCH_DATA,
resultFormat="json"
)
diff --git a/modules/workflows/methods/methodAi/methodAi.py b/modules/workflows/methods/methodAi/methodAi.py
index 86efe406..4cd98f14 100644
--- a/modules/workflows/methods/methodAi/methodAi.py
+++ b/modules/workflows/methods/methodAi/methodAi.py
@@ -37,6 +37,7 @@ class MethodAi(MethodBase):
"process": WorkflowActionDefinition(
actionId="ai.process",
description="Universal AI document processing action - accepts multiple input documents in any format and processes them together with a prompt",
+ dynamicMode=True,
parameters={
"aiPrompt": WorkflowActionParameter(
name="aiPrompt",
@@ -75,6 +76,7 @@ class MethodAi(MethodBase):
"webResearch": WorkflowActionDefinition(
actionId="ai.webResearch",
description="Web research with two-step process: search for URLs, then crawl content",
+ dynamicMode=True,
parameters={
"prompt": WorkflowActionParameter(
name="prompt",
@@ -120,6 +122,7 @@ class MethodAi(MethodBase):
"summarizeDocument": WorkflowActionDefinition(
actionId="ai.summarizeDocument",
description="Summarize one or more documents, extracting key points and main ideas",
+ dynamicMode=True,
parameters={
"documentList": WorkflowActionParameter(
name="documentList",
@@ -159,6 +162,7 @@ class MethodAi(MethodBase):
"translateDocument": WorkflowActionDefinition(
actionId="ai.translateDocument",
description="Translate documents to a target language while preserving formatting and structure",
+ dynamicMode=True,
parameters={
"documentList": WorkflowActionParameter(
name="documentList",
@@ -202,6 +206,7 @@ class MethodAi(MethodBase):
"convertDocument": WorkflowActionDefinition(
actionId="ai.convertDocument",
description="Convert documents between different formats (PDF→Word, Excel→CSV, etc.)",
+ dynamicMode=True,
parameters={
"documentList": WorkflowActionParameter(
name="documentList",
@@ -232,6 +237,7 @@ class MethodAi(MethodBase):
"generateDocument": WorkflowActionDefinition(
actionId="ai.generateDocument",
description="Generate documents from scratch or based on templates/inputs",
+ dynamicMode=True,
parameters={
"prompt": WorkflowActionParameter(
name="prompt",
@@ -269,6 +275,7 @@ class MethodAi(MethodBase):
"generateCode": WorkflowActionDefinition(
actionId="ai.generateCode",
description="Generate code files - explicitly sets intent to 'code'",
+ dynamicMode=True,
parameters={
"prompt": WorkflowActionParameter(
name="prompt",
diff --git a/modules/workflows/methods/methodContext/methodContext.py b/modules/workflows/methods/methodContext/methodContext.py
index 942f3f85..61afaf2e 100644
--- a/modules/workflows/methods/methodContext/methodContext.py
+++ b/modules/workflows/methods/methodContext/methodContext.py
@@ -35,6 +35,7 @@ class MethodContext(MethodBase):
"getDocumentIndex": WorkflowActionDefinition(
actionId="context.getDocumentIndex",
description="Generate a comprehensive index of all documents available in the current workflow",
+ dynamicMode=True,
parameters={
"resultType": WorkflowActionParameter(
name="resultType",
@@ -51,6 +52,7 @@ class MethodContext(MethodBase):
"extractContent": WorkflowActionDefinition(
actionId="context.extractContent",
description="Extract raw content parts from documents without AI processing. Returns ContentParts with different typeGroups (text, image, table, structure, container). Images are returned as base64 data, not as extracted text. Text content is extracted from text-based formats (PDF text layers, Word docs, etc.) but NOT from images (no OCR). Use this action to prepare documents for subsequent AI processing actions.",
+ dynamicMode=True,
parameters={
"documentList": WorkflowActionParameter(
name="documentList",
diff --git a/modules/workflows/methods/methodOutlook/methodOutlook.py b/modules/workflows/methods/methodOutlook/methodOutlook.py
index 31bc7dc3..4a978b7a 100644
--- a/modules/workflows/methods/methodOutlook/methodOutlook.py
+++ b/modules/workflows/methods/methodOutlook/methodOutlook.py
@@ -39,6 +39,7 @@ class MethodOutlook(MethodBase):
"readEmails": WorkflowActionDefinition(
actionId="outlook.readEmails",
description="Read emails and metadata from a mailbox folder",
+ dynamicMode=True,
parameters={
"connectionReference": WorkflowActionParameter(
name="connectionReference",
@@ -87,6 +88,7 @@ class MethodOutlook(MethodBase):
"searchEmails": WorkflowActionDefinition(
actionId="outlook.searchEmails",
description="Search emails by query and return matching items with metadata",
+ dynamicMode=True,
parameters={
"connectionReference": WorkflowActionParameter(
name="connectionReference",
@@ -135,6 +137,7 @@ class MethodOutlook(MethodBase):
"composeAndDraftEmailWithContext": WorkflowActionDefinition(
actionId="outlook.composeAndDraftEmailWithContext",
description="Compose email content using AI from context and optional documents, then create a draft",
+ dynamicMode=True,
parameters={
"connectionReference": WorkflowActionParameter(
name="connectionReference",
@@ -202,6 +205,7 @@ class MethodOutlook(MethodBase):
"sendDraftEmail": WorkflowActionDefinition(
actionId="outlook.sendDraftEmail",
description="Send draft email(s) using draft email JSON document(s) from action outlook.composeAndDraftEmailWithContext",
+ dynamicMode=True,
parameters={
"connectionReference": WorkflowActionParameter(
name="connectionReference",
diff --git a/modules/workflows/methods/methodSharepoint/methodSharepoint.py b/modules/workflows/methods/methodSharepoint/methodSharepoint.py
index 299d3fed..e8d41905 100644
--- a/modules/workflows/methods/methodSharepoint/methodSharepoint.py
+++ b/modules/workflows/methods/methodSharepoint/methodSharepoint.py
@@ -51,6 +51,7 @@ class MethodSharepoint(MethodBase):
"findDocumentPath": WorkflowActionDefinition(
actionId="sharepoint.findDocumentPath",
description="Find documents and folders by name/path across sites",
+ dynamicMode=True,
parameters={
"connectionReference": WorkflowActionParameter(
name="connectionReference",
@@ -88,6 +89,7 @@ class MethodSharepoint(MethodBase):
"readDocuments": WorkflowActionDefinition(
actionId="sharepoint.readDocuments",
description="Read documents from SharePoint and extract content/metadata",
+ dynamicMode=True,
parameters={
"connectionReference": WorkflowActionParameter(
name="connectionReference",
@@ -124,6 +126,7 @@ class MethodSharepoint(MethodBase):
"uploadDocument": WorkflowActionDefinition(
actionId="sharepoint.uploadDocument",
description="Upload documents to SharePoint",
+ dynamicMode=True,
parameters={
"connectionReference": WorkflowActionParameter(
name="connectionReference",
@@ -152,6 +155,7 @@ class MethodSharepoint(MethodBase):
"listDocuments": WorkflowActionDefinition(
actionId="sharepoint.listDocuments",
description="List documents and folders in SharePoint paths across sites",
+ dynamicMode=True,
parameters={
"connectionReference": WorkflowActionParameter(
name="connectionReference",
@@ -181,6 +185,7 @@ class MethodSharepoint(MethodBase):
"analyzeFolderUsage": WorkflowActionDefinition(
actionId="sharepoint.analyzeFolderUsage",
description="Analyze usage intensity of folders and files in SharePoint",
+ dynamicMode=True,
parameters={
"connectionReference": WorkflowActionParameter(
name="connectionReference",
@@ -225,6 +230,7 @@ class MethodSharepoint(MethodBase):
"findSiteByUrl": WorkflowActionDefinition(
actionId="sharepoint.findSiteByUrl",
description="Find SharePoint site by hostname and site path",
+ dynamicMode=True,
parameters={
"connectionReference": WorkflowActionParameter(
name="connectionReference",
@@ -253,6 +259,7 @@ class MethodSharepoint(MethodBase):
"downloadFileByPath": WorkflowActionDefinition(
actionId="sharepoint.downloadFileByPath",
description="Download file from SharePoint by exact file path",
+ dynamicMode=True,
parameters={
"connectionReference": WorkflowActionParameter(
name="connectionReference",
@@ -281,6 +288,7 @@ class MethodSharepoint(MethodBase):
"copyFile": WorkflowActionDefinition(
actionId="sharepoint.copyFile",
description="Copy file within SharePoint",
+ dynamicMode=True,
parameters={
"connectionReference": WorkflowActionParameter(
name="connectionReference",
@@ -330,6 +338,7 @@ class MethodSharepoint(MethodBase):
"uploadFile": WorkflowActionDefinition(
actionId="sharepoint.uploadFile",
description="Upload raw file content (bytes) to SharePoint",
+ dynamicMode=True,
parameters={
"connectionReference": WorkflowActionParameter(
name="connectionReference",
diff --git a/modules/workflows/processing/shared/placeholderFactory.py b/modules/workflows/processing/shared/placeholderFactory.py
index c8920247..f94d08d4 100644
--- a/modules/workflows/processing/shared/placeholderFactory.py
+++ b/modules/workflows/processing/shared/placeholderFactory.py
@@ -79,8 +79,13 @@ def extractWorkflowHistory(service: Any) -> str:
logger.error(f"Error getting workflow history: {str(e)}")
return "No previous workflow rounds available"
-def extractAvailableMethods(service: Any) -> str:
- """Extract available methods for action planning. Maps to {{KEY:AVAILABLE_METHODS}}"""
+def extractAvailableMethods(service: Any, filterDynamicMode: bool = True) -> str:
+ """Extract available methods for action planning. Maps to {{KEY:AVAILABLE_METHODS}}
+
+ Args:
+ service: Service object
+ filterDynamicMode: If True, only include actions with dynamicMode=True flag (default: True for dynamic workflow prompts)
+ """
try:
# Get the methods dictionary directly from the global methods variable
if not methods:
@@ -105,7 +110,21 @@ def extractAvailableMethods(service: Any) -> str:
processed_methods.add(shortName)
+ # Get method instance to access _actions dictionary with WorkflowActionDefinition objects
+ methodInstance = methodInfo.get('instance')
+ if not methodInstance:
+ continue
+
for actionName, actionInfo in methodInfo['actions'].items():
+ # Check dynamicMode flag if filtering is enabled
+ if filterDynamicMode:
+ # Access original WorkflowActionDefinition from _actions dictionary
+ if hasattr(methodInstance, '_actions') and actionName in methodInstance._actions:
+ actionDef = methodInstance._actions[actionName]
+ # Only include actions with dynamicMode=True
+ if not getattr(actionDef, 'dynamicMode', False):
+ continue
+
# Create compound action name: method.action
compoundActionName = f"{shortName}.{actionName}"
# Get the action description
diff --git a/modules/workflows/workflowManager.py b/modules/workflows/workflowManager.py
index 593ba555..4d1abd0c 100644
--- a/modules/workflows/workflowManager.py
+++ b/modules/workflows/workflowManager.py
@@ -155,6 +155,15 @@ class WorkflowManager:
async def _workflowProcess(self, userInput: UserInputRequest) -> None:
"""Process a workflow with user input"""
try:
+ # Send ChatLog message immediately when workflow starts
+ workflow = self.services.workflow
+ self.services.chat.storeLog(workflow, {
+ "message": "Workflow started...",
+ "type": "info",
+ "status": "running",
+ "progress": 0.0
+ })
+
# Store the current user prompt in services for easy access throughout the workflow
self.services.rawUserPrompt = userInput.prompt
self.services.currentUserPrompt = userInput.prompt
diff --git a/tests/functional/test01_ai_model_selection.py b/tests/functional/test01_ai_model_selection.py
index 84b22494..b06e9c64 100644
--- a/tests/functional/test01_ai_model_selection.py
+++ b/tests/functional/test01_ai_model_selection.py
@@ -252,7 +252,7 @@ class ModelSelectionTester:
print(f"{'='*80}")
options = AiCallOptions(
- operationType=OperationTypeEnum.WEB_SEARCH,
+ operationType=OperationTypeEnum.WEB_SEARCH_DATA,
priority=PriorityEnum.BALANCED,
processingMode=ProcessingModeEnum.ADVANCED,
maxCost=0.05,
@@ -269,7 +269,7 @@ class ModelSelectionTester:
print(f"{'='*80}")
options = AiCallOptions(
- operationType=OperationTypeEnum.WEB_SEARCH,
+ operationType=OperationTypeEnum.WEB_SEARCH_DATA,
priority=PriorityEnum.BALANCED,
processingMode=ProcessingModeEnum.BASIC,
maxCost=0.01,
@@ -327,7 +327,7 @@ class ModelSelectionTester:
# This method uses webQuery internally, so it uses the same model selection as web research
options = AiCallOptions(
- operationType=OperationTypeEnum.WEB_SEARCH,
+ operationType=OperationTypeEnum.WEB_SEARCH_DATA,
priority=PriorityEnum.BALANCED,
processingMode=ProcessingModeEnum.ADVANCED,
maxCost=0.03,
@@ -436,7 +436,7 @@ class ModelSelectionTester:
print("\n Testing: aiObjects.webQuery() - Web Research")
try:
options = AiCallOptions(
- operationType=OperationTypeEnum.WEB_SEARCH,
+ operationType=OperationTypeEnum.WEB_SEARCH_DATA,
priority=PriorityEnum.BALANCED,
processingMode=ProcessingModeEnum.ADVANCED,
maxCost=0.05,
diff --git a/tests/functional/test02_ai_models.py b/tests/functional/test02_ai_models.py
index 0578ba7b..12a374f8 100644
--- a/tests/functional/test02_ai_models.py
+++ b/tests/functional/test02_ai_models.py
@@ -11,7 +11,7 @@ This script tests all available models with all their supported operation types:
- DATA_EXTRACT: Data extraction
- IMAGE_ANALYSE: Image analysis
- IMAGE_GENERATE: Image generation
-- WEB_SEARCH: Web search
+- WEB_SEARCH_DATA: Web search
- WEB_CRAWL: Web crawling
For each model, it tests every operation type the model supports and validates
@@ -119,7 +119,7 @@ class AIModelsTester:
OperationTypeEnum.DATA_EXTRACT: "Extract key information from this text about artificial intelligence trends.",
OperationTypeEnum.IMAGE_ANALYSE: "Describe what you see in this image.",
OperationTypeEnum.IMAGE_GENERATE: "A futuristic cityscape with flying cars and neon lights.",
- OperationTypeEnum.WEB_SEARCH: "Who works in valueon ag in switzerland?", # Search query for valueon.ch
+ OperationTypeEnum.WEB_SEARCH_DATA: "Who works in valueon ag in switzerland?", # Search query for valueon.ch
OperationTypeEnum.WEB_CRAWL: "https://www.valueon.ch" # URL to crawl
}
return prompts.get(operationType, "Test prompt for this operation type.")
@@ -195,7 +195,7 @@ class AIModelsTester:
)
# Update message content to JSON format
messages[0]["content"] = json.dumps(imagePrompt.model_dump())
- elif operationType == OperationTypeEnum.WEB_SEARCH:
+ elif operationType == OperationTypeEnum.WEB_SEARCH_DATA:
# Create structured prompt for web search
webSearchPrompt = AiCallPromptWebSearch(
instruction=testPrompt,
diff --git a/tests/functional/test03_ai_operations.py b/tests/functional/test03_ai_operations.py
index 259932c2..36a8505a 100644
--- a/tests/functional/test03_ai_operations.py
+++ b/tests/functional/test03_ai_operations.py
@@ -74,7 +74,7 @@ class MethodAiOperationsTester:
"aiPrompt": "A beautiful sunset over the ocean with purple and orange hues",
"resultType": "png"
},
- OperationTypeEnum.WEB_SEARCH: {
+ OperationTypeEnum.WEB_SEARCH_DATA: {
"aiPrompt": "Who works in valueon ag in switzerland?",
"resultType": "json"
},
diff --git a/tests/unit/services/test_json_extraction_merging.py b/tests/unit/services/test_json_extraction_merging.py
new file mode 100644
index 00000000..07ecfa4b
--- /dev/null
+++ b/tests/unit/services/test_json_extraction_merging.py
@@ -0,0 +1,386 @@
+#!/usr/bin/env python3
+# Copyright (c) 2025 Patrick Motsch
+# All rights reserved.
+"""
+Test script for JSON extraction response detection and merging.
+Run: python gateway/tests/unit/services/test_json_extraction_merging.py
+"""
+
+import json
+import sys
+import os
+
+# Add gateway to path
+sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../../..'))
+
+from modules.datamodels.datamodelExtraction import ContentPart
+from modules.services.serviceExtraction.mainServiceExtraction import ExtractionService
+
+
+def test_detects_json_with_code_fences():
+ """Test that JSON extraction responses with markdown code fences are detected"""
+ print("Test 1: Detecting JSON with code fences...")
+ service = ExtractionService(None)
+
+ content_part = ContentPart(
+ id="test1",
+ label="test1",
+ typeGroup="text",
+ mimeType="text/plain",
+ data='```json\n{"extracted_content": {"text": "Sample text", "tables": []}}\n```'
+ )
+
+ result = service._isJsonExtractionResponse([content_part])
+ assert result == True, "Should detect JSON with code fences"
+ print(" [PASS]")
+
+
+def test_detects_json_without_code_fences():
+ """Test that JSON extraction responses without code fences are detected"""
+ print("Test 2: Detecting JSON without code fences...")
+ service = ExtractionService(None)
+
+ content_part = ContentPart(
+ id="test2",
+ label="test2",
+ typeGroup="text",
+ mimeType="text/plain",
+ data='{"extracted_content": {"text": "Sample text", "tables": []}}'
+ )
+
+ result = service._isJsonExtractionResponse([content_part])
+ assert result == True, "Should detect JSON without code fences"
+ print(" [PASS]")
+
+
+def test_rejects_non_extraction_json():
+ """Test that regular JSON (without extracted_content) is rejected"""
+ print("Test 3: Rejecting non-extraction JSON...")
+ service = ExtractionService(None)
+
+ content_part = ContentPart(
+ id="test3",
+ label="test3",
+ typeGroup="text",
+ mimeType="text/plain",
+ data='{"documents": [{"sections": []}]}'
+ )
+
+ result = service._isJsonExtractionResponse([content_part])
+ assert result == False, "Should reject non-extraction JSON"
+ print(" [PASS]")
+
+
+def test_rejects_non_json_content():
+ """Test that non-JSON content is rejected"""
+ print("Test 4: Rejecting non-JSON content...")
+ service = ExtractionService(None)
+
+ content_part = ContentPart(
+ id="test4",
+ label="test4",
+ typeGroup="text",
+ mimeType="text/plain",
+ data="This is plain text, not JSON"
+ )
+
+ result = service._isJsonExtractionResponse([content_part])
+ assert result == False, "Should reject non-JSON content"
+ print(" [PASS]")
+
+
+def test_merges_tables_with_same_headers():
+ """Test that tables with identical headers are merged"""
+ print("Test 5: Merging tables with same headers...")
+ service = ExtractionService(None)
+
+ part1 = ContentPart(
+ id="test1",
+ label="test1",
+ typeGroup="table",
+ mimeType="application/json",
+ data='```json\n{"extracted_content": {"tables": [{"headers": ["Name", "Amount"], "rows": [["Alice", "100"], ["Bob", "200"]]}]}}\n```'
+ )
+
+ part2 = ContentPart(
+ id="test2",
+ label="test2",
+ typeGroup="table",
+ mimeType="application/json",
+ data='```json\n{"extracted_content": {"tables": [{"headers": ["Name", "Amount"], "rows": [["Charlie", "300"], ["Alice", "100"]]}]}}\n```'
+ )
+
+ merged = service._mergeJsonExtractionResponses([part1, part2])
+
+ # Should have one table group with merged rows
+ assert len(merged["extracted_content"]["tables"]) == 1, f"Should have one merged table, got {len(merged['extracted_content']['tables'])}"
+ table = merged["extracted_content"]["tables"][0]
+ assert table["headers"] == ["Name", "Amount"], f"Headers should match, got {table['headers']}"
+ # Should have 3 unique rows (Alice appears twice but should be deduplicated)
+ assert len(table["rows"]) == 3, f"Should have 3 unique rows, got {len(table['rows'])}"
+ assert ["Alice", "100"] in table["rows"], "Alice row should be present"
+ assert ["Bob", "200"] in table["rows"], "Bob row should be present"
+ assert ["Charlie", "300"] in table["rows"], "Charlie row should be present"
+ print(" [PASS]")
+
+
+def test_merges_multiple_json_blocks_separated_by_dash():
+ """Test that multiple JSON blocks separated by --- are merged"""
+ print("Test 6: Merging multiple JSON blocks separated by ---...")
+ service = ExtractionService(None)
+
+ # Create content part with multiple JSON blocks separated by ---
+ part1 = ContentPart(
+ id="test1",
+ label="test1",
+ typeGroup="table",
+ mimeType="application/json",
+ data='```json\n{"extracted_content": {"tables": [{"headers": ["Name", "Amount"], "rows": [["Alice", "100"]]}]}}\n```\n---\n```json\n{"extracted_content": {"tables": [{"headers": ["Name", "Amount"], "rows": [["Bob", "200"]]}]}}\n```'
+ )
+
+ merged = service._mergeJsonExtractionResponses([part1])
+
+ # Should have one table with merged rows from both JSON blocks
+ assert len(merged["extracted_content"]["tables"]) == 1, f"Should have one merged table, got {len(merged['extracted_content']['tables'])}"
+ table = merged["extracted_content"]["tables"][0]
+ assert table["headers"] == ["Name", "Amount"], f"Headers should match, got {table['headers']}"
+ assert len(table["rows"]) == 2, f"Should have 2 rows, got {len(table['rows'])}"
+ assert ["Alice", "100"] in table["rows"], "Alice row should be present"
+ assert ["Bob", "200"] in table["rows"], "Bob row should be present"
+ print(" [PASS]")
+
+
+def test_merges_text_content():
+ """Test that text content from multiple parts is merged"""
+ print("Test 7: Merging text content...")
+ service = ExtractionService(None)
+
+ part1 = ContentPart(
+ id="test1",
+ label="test1",
+ typeGroup="text",
+ mimeType="text/plain",
+ data='```json\n{"extracted_content": {"text": "First paragraph."}}\n```'
+ )
+
+ part2 = ContentPart(
+ id="test2",
+ label="test2",
+ typeGroup="text",
+ mimeType="text/plain",
+ data='```json\n{"extracted_content": {"text": "Second paragraph."}}\n```'
+ )
+
+ merged = service._mergeJsonExtractionResponses([part1, part2])
+
+ # Text should be concatenated with newlines
+ text = merged["extracted_content"]["text"]
+ assert "First paragraph." in text, "First paragraph should be present"
+ assert "Second paragraph." in text, "Second paragraph should be present"
+ print(" [PASS]")
+
+
+def test_merges_headings_and_lists():
+ """Test that headings and lists are merged"""
+ print("Test 8: Merging headings and lists...")
+ service = ExtractionService(None)
+
+ part1 = ContentPart(
+ id="test1",
+ label="test1",
+ typeGroup="text",
+ mimeType="text/plain",
+ data='```json\n{"extracted_content": {"headings": [{"level": 1, "text": "Title 1"}], "lists": [{"type": "bullet", "items": ["Item 1"]}]}}\n```'
+ )
+
+ part2 = ContentPart(
+ id="test2",
+ label="test2",
+ typeGroup="text",
+ mimeType="text/plain",
+ data='```json\n{"extracted_content": {"headings": [{"level": 2, "text": "Subtitle 1"}], "lists": [{"type": "bullet", "items": ["Item 2"]}]}}\n```'
+ )
+
+ merged = service._mergeJsonExtractionResponses([part1, part2])
+
+ # Should have 2 headings
+ assert len(merged["extracted_content"]["headings"]) == 2, f"Should have 2 headings, got {len(merged['extracted_content']['headings'])}"
+ assert merged["extracted_content"]["headings"][0]["text"] == "Title 1", "First heading should be Title 1"
+ assert merged["extracted_content"]["headings"][1]["text"] == "Subtitle 1", "Second heading should be Subtitle 1"
+
+ # Should have 2 lists
+ assert len(merged["extracted_content"]["lists"]) == 2, f"Should have 2 lists, got {len(merged['extracted_content']['lists'])}"
+ assert merged["extracted_content"]["lists"][0]["items"] == ["Item 1"], "First list should have Item 1"
+ assert merged["extracted_content"]["lists"][1]["items"] == ["Item 2"], "Second list should have Item 2"
+ print(" [PASS]")
+
+
+def test_handles_empty_content_parts():
+ """Test that empty content parts are handled gracefully"""
+ print("Test 9: Handling empty content parts...")
+ service = ExtractionService(None)
+
+ part1 = ContentPart(
+ id="test1",
+ label="test1",
+ typeGroup="text",
+ mimeType="text/plain",
+ data='```json\n{"extracted_content": {"text": "Some text"}}\n```'
+ )
+
+ part2 = ContentPart(
+ id="test2",
+ label="test2",
+ typeGroup="text",
+ mimeType="text/plain",
+ data="" # Empty part
+ )
+
+ merged = service._mergeJsonExtractionResponses([part1, part2])
+
+ # Should still have the text from part1
+ assert merged["extracted_content"]["text"] == "Some text", "Should have text from part1"
+ print(" [PASS]")
+
+
+def test_merges_tables_with_different_headers():
+ """Test that tables with different headers are kept separate"""
+ print("Test 10: Keeping tables with different headers separate...")
+ service = ExtractionService(None)
+
+ part1 = ContentPart(
+ id="test1",
+ label="test1",
+ typeGroup="table",
+ mimeType="application/json",
+ data='```json\n{"extracted_content": {"tables": [{"headers": ["Date", "Amount"], "rows": [["2024-01-01", "100"]]}]}}\n```'
+ )
+
+ part2 = ContentPart(
+ id="test2",
+ label="test2",
+ typeGroup="table",
+ mimeType="application/json",
+ data='```json\n{"extracted_content": {"tables": [{"headers": ["Name", "Email"], "rows": [["Alice", "alice@example.com"]]}]}}\n```'
+ )
+
+ merged = service._mergeJsonExtractionResponses([part1, part2])
+
+ # Should have 2 separate tables (different headers)
+ assert len(merged["extracted_content"]["tables"]) == 2, f"Should have 2 separate tables, got {len(merged['extracted_content']['tables'])}"
+
+ # Check first table
+ table1 = merged["extracted_content"]["tables"][0]
+ assert table1["headers"] == ["Date", "Amount"], "First table should have Date/Amount headers"
+ assert len(table1["rows"]) == 1, "First table should have 1 row"
+
+ # Check second table
+ table2 = merged["extracted_content"]["tables"][1]
+ assert table2["headers"] == ["Name", "Email"], "Second table should have Name/Email headers"
+ assert len(table2["rows"]) == 1, "Second table should have 1 row"
+ print(" [PASS]")
+
+
+def test_real_world_scenario():
+ """Test with a realistic scenario similar to the debug file"""
+ print("Test 11: Real-world scenario (multiple documents, multiple JSON blocks)...")
+ service = ExtractionService(None)
+
+ # Simulate 3 documents, each with a table extraction response
+ part1 = ContentPart(
+ id="doc1",
+ label="doc1",
+ typeGroup="table",
+ mimeType="application/json",
+ data='```json\n{"extracted_content": {"tables": [{"headers": ["Transaction ID", "Date", "Amount"], "rows": [["TXN001", "2024-01-01", "100.00"], ["TXN002", "2024-01-02", "200.00"]]}]}}\n```'
+ )
+
+ part2 = ContentPart(
+ id="doc2",
+ label="doc2",
+ typeGroup="table",
+ mimeType="application/json",
+ data='```json\n{"extracted_content": {"tables": [{"headers": ["Transaction ID", "Date", "Amount"], "rows": [["TXN003", "2024-01-03", "300.00"], ["TXN001", "2024-01-01", "100.00"]]}]}}\n```'
+ )
+
+ # Part 3 has multiple JSON blocks separated by ---
+ part3 = ContentPart(
+ id="doc3",
+ label="doc3",
+ typeGroup="table",
+ mimeType="application/json",
+ data='```json\n{"extracted_content": {"tables": [{"headers": ["Transaction ID", "Date", "Amount"], "rows": [["TXN004", "2024-01-04", "400.00"]]}]}}\n```\n---\n```json\n{"extracted_content": {"tables": [{"headers": ["Transaction ID", "Date", "Amount"], "rows": [["TXN005", "2024-01-05", "500.00"]]}]}}\n```'
+ )
+
+ merged = service._mergeJsonExtractionResponses([part1, part2, part3])
+
+ # Should have one merged table with all unique transactions
+ assert len(merged["extracted_content"]["tables"]) == 1, f"Should have one merged table, got {len(merged['extracted_content']['tables'])}"
+ table = merged["extracted_content"]["tables"][0]
+ assert table["headers"] == ["Transaction ID", "Date", "Amount"], "Headers should match"
+
+ # Should have 5 unique rows (TXN001 appears twice but should be deduplicated)
+ assert len(table["rows"]) == 5, f"Should have 5 unique rows, got {len(table['rows'])}"
+
+ # Verify all transactions are present
+ transaction_ids = [row[0] for row in table["rows"]]
+ assert "TXN001" in transaction_ids, "TXN001 should be present"
+ assert "TXN002" in transaction_ids, "TXN002 should be present"
+ assert "TXN003" in transaction_ids, "TXN003 should be present"
+ assert "TXN004" in transaction_ids, "TXN004 should be present"
+ assert "TXN005" in transaction_ids, "TXN005 should be present"
+
+ # Verify TXN001 appears only once (deduplicated)
+ assert transaction_ids.count("TXN001") == 1, "TXN001 should appear only once (deduplicated)"
+
+ print(" [PASS]")
+
+
+def main():
+ """Run all tests"""
+ print("=" * 60)
+ print("Testing JSON Extraction Response Detection and Merging")
+ print("=" * 60)
+ print()
+
+ tests = [
+ test_detects_json_with_code_fences,
+ test_detects_json_without_code_fences,
+ test_rejects_non_extraction_json,
+ test_rejects_non_json_content,
+ test_merges_tables_with_same_headers,
+ test_merges_multiple_json_blocks_separated_by_dash,
+ test_merges_text_content,
+ test_merges_headings_and_lists,
+ test_handles_empty_content_parts,
+ test_merges_tables_with_different_headers,
+ test_real_world_scenario,
+ ]
+
+ passed = 0
+ failed = 0
+
+ for test in tests:
+ try:
+ test()
+ passed += 1
+ except AssertionError as e:
+ print(f" [FAIL] {e}")
+ failed += 1
+ except Exception as e:
+ print(f" [ERROR] {e}")
+ import traceback
+ traceback.print_exc()
+ failed += 1
+ print()
+
+ print("=" * 60)
+ print(f"Results: {passed} passed, {failed} failed")
+ print("=" * 60)
+
+ return 0 if failed == 0 else 1
+
+
+if __name__ == "__main__":
+ sys.exit(main())
+