From f0733204fb5625ee48c94ffeb118c9f37cb4f447 Mon Sep 17 00:00:00 2001
From: ValueOn AG
Date: Tue, 14 Oct 2025 22:48:55 +0200
Subject: [PATCH] content mapping and transformation valdiated
---
.../serviceAi/subDocumentProcessing.py | 28 +++--
.../renderers/rendererText.py | 22 +++-
.../serviceGeneration/subPromptBuilder.py | 119 +++++++++++++++---
.../serviceWorkflow/mainServiceWorkflow.py | 28 ++++-
.../processing/adaptive/contentValidator.py | 79 +++++++++++-
.../workflows/processing/modes/modeReact.py | 5 +-
.../shared/promptGenerationTaskplan.py | 10 ++
7 files changed, 249 insertions(+), 42 deletions(-)
diff --git a/modules/services/serviceAi/subDocumentProcessing.py b/modules/services/serviceAi/subDocumentProcessing.py
index a0dc5088..e96a9394 100644
--- a/modules/services/serviceAi/subDocumentProcessing.py
+++ b/modules/services/serviceAi/subDocumentProcessing.py
@@ -176,22 +176,28 @@ class SubDocumentProcessing:
# Merge with JSON mode
mergedJsonDocument = self._mergeChunkResultsJson(chunkResults, options)
- # Normalize merged JSON into a single canonical table
+ # Normalize merged JSON into a single canonical table (only if table content exists)
try:
from modules.services.serviceNormalization.mainServiceNormalization import NormalizationService
normalizer = NormalizationService(self.services)
inventory = normalizer.discoverStructures(mergedJsonDocument)
- # Use workflow id as cache key
- cacheKey = self.services.currentWorkflow.id
- # Provide the extraction/merge prompt context when available to help mapping
- mergePrompt = prompt
- mapping = await normalizer.requestHeaderMapping(inventory, cacheKey, None, mergePrompt)
- canonical = normalizer.applyMapping(mergedJsonDocument, mapping)
- report = normalizer.validateCanonical(canonical)
- if report.get('success'):
- mergedJsonDocument = canonical
+
+ # Check if any table content was discovered
+ tableHeaders = inventory.get("tableHeaders", [])
+ if not tableHeaders:
+ logger.info("No table content found in merged JSON, skipping normalization and returning original structure")
else:
- raise ValueError('Normalization produced zero rows')
+ # Use workflow id as cache key
+ cacheKey = self.services.currentWorkflow.id
+ # Provide the extraction/merge prompt context when available to help mapping
+ mergePrompt = prompt
+ mapping = await normalizer.requestHeaderMapping(inventory, cacheKey, None, mergePrompt)
+ canonical = normalizer.applyMapping(mergedJsonDocument, mapping)
+ report = normalizer.validateCanonical(canonical)
+ if report.get('success'):
+ mergedJsonDocument = canonical
+ else:
+ raise ValueError('Normalization produced zero rows')
except Exception as e:
# Surface normalization failure while leaving original merged JSON (single-path expectation is to fail)
raise
diff --git a/modules/services/serviceGeneration/renderers/rendererText.py b/modules/services/serviceGeneration/renderers/rendererText.py
index 6ca1415b..33d648e8 100644
--- a/modules/services/serviceGeneration/renderers/rendererText.py
+++ b/modules/services/serviceGeneration/renderers/rendererText.py
@@ -108,16 +108,30 @@ class RendererText(BaseRenderer):
elif section_type == "bullet_list":
return self._render_json_bullet_list(section_data)
elif section_type == "heading":
- return self._render_json_heading(section_data)
+ # Render each heading element in the elements array
+ # section_data is already the elements array from _get_section_data
+ rendered_elements = []
+ for element in section_data:
+ rendered_elements.append(self._render_json_heading(element))
+ return "\n".join(rendered_elements)
elif section_type == "paragraph":
- return self._render_json_paragraph(section_data)
+ # Render each paragraph element in the elements array
+ # section_data is already the elements array from _get_section_data
+ rendered_elements = []
+ for element in section_data:
+ rendered_elements.append(self._render_json_paragraph(element))
+ return "\n".join(rendered_elements)
elif section_type == "code_block":
return self._render_json_code_block(section_data)
elif section_type == "image":
return self._render_json_image(section_data)
else:
- # Fallback to paragraph for unknown types
- return self._render_json_paragraph(section_data)
+ # Fallback to paragraph for unknown types - render each element
+ # section_data is already the elements array from _get_section_data
+ rendered_elements = []
+ for element in section_data:
+ rendered_elements.append(self._render_json_paragraph(element))
+ return "\n".join(rendered_elements)
except Exception as e:
self.logger.warning(f"Error rendering section {self._get_section_id(section)}: {str(e)}")
diff --git a/modules/services/serviceGeneration/subPromptBuilder.py b/modules/services/serviceGeneration/subPromptBuilder.py
index c0139f45..cbcce375 100644
--- a/modules/services/serviceGeneration/subPromptBuilder.py
+++ b/modules/services/serviceGeneration/subPromptBuilder.py
@@ -45,7 +45,28 @@ async def buildAdaptiveExtractionPrompt(
"filename": "section_1.xlsx",
"sections": [
{
- "id": "table_1",
+ "id": "section_1",
+ "content_type": "heading",
+ "elements": [
+ {
+ "level": 1,
+ "text": "1. SECTION TITLE"
+ }
+ ],
+ "order": 1
+ },
+ {
+ "id": "section_2",
+ "content_type": "paragraph",
+ "elements": [
+ {
+ "text": "This is the actual content that should be extracted from the document."
+ }
+ ],
+ "order": 2
+ },
+ {
+ "id": "section_3",
"content_type": "table",
"elements": [
{
@@ -53,7 +74,7 @@ async def buildAdaptiveExtractionPrompt(
"rows": [["Value 1", "Value 2"]]
}
],
- "order": 1
+ "order": 3
}
]
}
@@ -69,7 +90,28 @@ async def buildAdaptiveExtractionPrompt(
},
"sections": [
{
- "id": "table_1",
+ "id": "section_1",
+ "content_type": "heading",
+ "elements": [
+ {
+ "level": 1,
+ "text": "1. SECTION TITLE"
+ }
+ ],
+ "order": 1
+ },
+ {
+ "id": "section_2",
+ "content_type": "paragraph",
+ "elements": [
+ {
+ "text": "This is the actual content that should be extracted from the document."
+ }
+ ],
+ "order": 2
+ },
+ {
+ "id": "section_3",
"content_type": "table",
"elements": [
{
@@ -77,7 +119,7 @@ async def buildAdaptiveExtractionPrompt(
"rows": [["Value 1", "Value 2"]]
}
],
- "order": 1
+ "order": 3
}
]
}
@@ -253,14 +295,11 @@ Consider the user's intent and the most logical way to organize the extracted co
"sections": [
{
"id": "section_001",
- "content_type": "table",
+ "content_type": "heading",
"elements": [
{
- "headers": ["Column 1", "Column 2", "Column 3"],
- "rows": [
- ["Value 1", "Value 2", "Value 3"],
- ["Value 4", "Value 5", "Value 6"]
- ]
+ "level": 1,
+ "text": "1. SECTION TITLE"
}
],
"order": 1,
@@ -340,7 +379,7 @@ async def buildExtractionPrompt(
from .subJsonSchema import get_document_subJsonSchema
jsonSchema = get_document_subJsonSchema()
- # Generic block for JSON extraction - use example data instead of schema
+ # Generic block for JSON extraction - use mixed example data showing different content types
example_data = {
"metadata": {
"title": "Example Document",
@@ -351,6 +390,29 @@ async def buildExtractionPrompt(
"sections": [
{
"id": "section_001",
+ "content_type": "heading",
+ "elements": [
+ {
+ "level": 1,
+ "text": "1. INTRODUCTION"
+ }
+ ],
+ "order": 1,
+ "metadata": {}
+ },
+ {
+ "id": "section_002",
+ "content_type": "paragraph",
+ "elements": [
+ {
+ "text": "This is a sample paragraph with actual content that should be extracted from the document."
+ }
+ ],
+ "order": 2,
+ "metadata": {}
+ },
+ {
+ "id": "section_003",
"content_type": "table",
"elements": [
{
@@ -361,7 +423,7 @@ async def buildExtractionPrompt(
]
}
],
- "order": 1,
+ "order": 3,
"metadata": {}
}
],
@@ -486,17 +548,38 @@ CRITICAL: The AI MUST generate content using the CANONICAL JSON FORMAT with this
"sections": [
{{
"id": "section_1",
- "content_type": "table",
+ "content_type": "heading",
"elements": [
{{
- "headers": ["Column1", "Column2", "Column3"],
- "rows": [
- ["Value1", "Value2", "Value3"],
- ["Value4", "Value5", "Value6"]
- ]
+ "level": 1,
+ "text": "1. SECTION TITLE"
}}
],
"order": 1
+ }},
+ {{
+ "id": "section_2",
+ "content_type": "paragraph",
+ "elements": [
+ {{
+ "text": "This is the actual content that should be extracted from the document."
+ }}
+ ],
+ "order": 2
+ }},
+ {{
+ "id": "section_3",
+ "content_type": "table",
+ "elements": [
+ {{
+ "headers": ["Column 1", "Column 2", "Column 3"],
+ "rows": [
+ ["Value 1", "Value 2", "Value 3"],
+ ["Value 4", "Value 5", "Value 6"]
+ ]
+ }}
+ ],
+ "order": 3
}}
]
}}
diff --git a/modules/services/serviceWorkflow/mainServiceWorkflow.py b/modules/services/serviceWorkflow/mainServiceWorkflow.py
index d5f71cfd..afc4e3b5 100644
--- a/modules/services/serviceWorkflow/mainServiceWorkflow.py
+++ b/modules/services/serviceWorkflow/mainServiceWorkflow.py
@@ -597,12 +597,16 @@ class WorkflowService:
if not workflow or not hasattr(workflow, 'messages'):
return "No documents available"
- # Reload workflow from database to ensure we have all messages
- if hasattr(workflow, 'id'):
- try:
- workflow = self.getWorkflow(workflow.id)
- except Exception as e:
- logger.warning(f"Could not reload workflow from database: {str(e)}")
+ # Use the provided workflow object directly to avoid database reload issues
+ # that can cause filename truncation. The workflow object should already be up-to-date.
+ logger.debug(f"Using provided workflow object for getAvailableDocuments (ID: {workflow.id if hasattr(workflow, 'id') else 'unknown'})")
+
+ # Debug: Check document filenames in the workflow object
+ if hasattr(workflow, 'messages') and workflow.messages:
+ for message in workflow.messages:
+ if hasattr(message, 'documents') and message.documents:
+ for doc in message.documents:
+ logger.debug(f"Workflow document {doc.id}: fileName='{doc.fileName}' (length: {len(doc.fileName)})")
# Get document reference list using the exact same logic as old system
document_list = self._getDocumentReferenceList(workflow)
@@ -739,12 +743,22 @@ class WorkflowService:
"""Update file attributes (fileName, fileSize, mimeType) for documents"""
for doc in documents:
try:
+ # Debug: Log original filename before refresh
+ original_filename = doc.fileName
+ logger.debug(f"Before refresh - Document {doc.id}: fileName='{original_filename}' (length: {len(original_filename)})")
+
# Use the proper WorkflowService method to get file info
file_info = self.getFileInfo(doc.fileId)
if file_info:
+ db_filename = file_info.get("fileName", doc.fileName)
+ logger.debug(f"Database filename for {doc.id}: '{db_filename}' (length: {len(db_filename)})")
+
doc.fileName = file_info.get("fileName", doc.fileName)
doc.fileSize = file_info.get("size", doc.fileSize)
doc.mimeType = file_info.get("mimeType", doc.mimeType)
+
+ # Debug: Log final filename after refresh
+ logger.debug(f"After refresh - Document {doc.id}: fileName='{doc.fileName}' (length: {len(doc.fileName)})")
else:
logger.warning(f"File not found for document {doc.id}, fileId: {doc.fileId}")
except Exception as e:
@@ -760,6 +774,8 @@ class WorkflowService:
def _getDocumentReferenceFromChatDocument(self, document, message) -> str:
"""Get document reference using document ID and filename."""
try:
+ # Debug logging to track filename truncation
+ logger.debug(f"Creating document reference for {document.id}: fileName='{document.fileName}' (length: {len(document.fileName)})")
# Use document ID and filename for simple reference
return f"docItem:{document.id}:{document.fileName}"
except Exception as e:
diff --git a/modules/workflows/processing/adaptive/contentValidator.py b/modules/workflows/processing/adaptive/contentValidator.py
index 48339cb4..5253ab5e 100644
--- a/modules/workflows/processing/adaptive/contentValidator.py
+++ b/modules/workflows/processing/adaptive/contentValidator.py
@@ -3,6 +3,7 @@
import re
import logging
+import json
from typing import List, Dict, Any
logger = logging.getLogger(__name__)
@@ -14,8 +15,14 @@ class ContentValidator:
pass
def validateContent(self, documents: List[Any], intent: Dict[str, Any]) -> Dict[str, Any]:
- """Validates delivered content against user intent"""
+ """Validates delivered content against user intent using AI"""
try:
+ # First, try AI-based validation for intelligent gap analysis
+ aiValidation = self._validateWithAI(documents, intent)
+ if aiValidation:
+ return aiValidation
+
+ # Fallback to rule-based validation if AI validation fails
validationDetails = []
for doc in documents:
@@ -306,3 +313,73 @@ class ContentValidator:
"validationDetails": [],
"improvementSuggestions": [f"Validation failed: {error}"]
}
+
+ def _validateWithAI(self, documents: List[Any], intent: Dict[str, Any]) -> Dict[str, Any]:
+ """AI-based validation to intelligently assess task completion"""
+ try:
+ # Extract content from all documents
+ documentContents = []
+ for doc in documents:
+ content = self._extractContent(doc)
+ documentContents.append({
+ "name": getattr(doc, 'documentName', 'Unknown'),
+ "content": content[:2000] # Limit content for AI processing
+ })
+
+ # Create AI validation prompt
+ validationPrompt = f"""
+You are a task completion validator. Analyze if the delivered content actually fulfills the user's request.
+
+USER REQUEST: {intent.get('primaryGoal', 'Unknown')}
+
+DELIVERED CONTENT:
+{json.dumps(documentContents, indent=2)}
+
+TASK: Determine if the user's request has been fully completed.
+
+Analyze the gap between what was requested and what was delivered. Consider any missing elements, incorrect formats, incomplete work, or other discrepancies.
+
+Respond with JSON only:
+{{
+ "overallSuccess": true/false,
+ "qualityScore": 0.0-1.0,
+ "gapAnalysis": "Detailed analysis of what's missing or incorrect",
+ "improvementSuggestions": ["specific action 1", "specific action 2"]
+}}
+"""
+
+ # Call AI service for validation
+ from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, OperationType
+ request_options = AiCallOptions()
+ request_options.operationType = OperationType.GENERAL
+
+ request = AiCallRequest(prompt=validationPrompt, context="", options=request_options)
+
+ # Get AI service from the workflow context
+ if hasattr(self, 'services') and hasattr(self.services, 'ai'):
+ response = self.services.ai.aiObjects.call(request)
+ if response and response.content:
+ import re
+ result = response.content.strip()
+ json_match = re.search(r'\{.*\}', result, re.DOTALL)
+ if json_match:
+ result = json_match.group(0)
+
+ aiResult = json.loads(result)
+
+ return {
+ "overallSuccess": aiResult.get("overallSuccess", False),
+ "qualityScore": aiResult.get("qualityScore", 0.0),
+ "validationDetails": [{
+ "documentName": "AI Validation",
+ "gapAnalysis": aiResult.get("gapAnalysis", ""),
+ "successCriteriaMet": [aiResult.get("overallSuccess", False)]
+ }],
+ "improvementSuggestions": aiResult.get("improvementSuggestions", [])
+ }
+
+ return None # Fallback to rule-based validation
+
+ except Exception as e:
+ logger.error(f"AI validation failed: {str(e)}")
+ return None # Fallback to rule-based validation
\ No newline at end of file
diff --git a/modules/workflows/processing/modes/modeReact.py b/modules/workflows/processing/modes/modeReact.py
index 606d1123..1bc893a5 100644
--- a/modules/workflows/processing/modes/modeReact.py
+++ b/modules/workflows/processing/modes/modeReact.py
@@ -33,6 +33,7 @@ class ReactMode(BaseMode):
# Initialize adaptive components
self.intentAnalyzer = IntentAnalyzer()
self.contentValidator = ContentValidator()
+ self.contentValidator.services = self.services # Pass services for AI validation
self.learningEngine = LearningEngine()
self.progressTracker = ProgressTracker()
self.currentIntent = None
@@ -235,8 +236,8 @@ class ReactMode(BaseMode):
valid_refs = []
for line in available_docs.split('\n'):
if 'docList:' in line or 'docItem:' in line:
- # Extract reference from line like " - docList:msg_xxx:label"
- ref_match = re.search(r'(docList:[^\s]+|docItem:[^\s]+)', line)
+ # Extract reference from line like " - docList:msg_xxx:label" or " - docItem:xxx:filename with spaces"
+ ref_match = re.search(r'(docList:[^\s]+|docItem:[^\s]+(?:\s+[^\s]+)*)', line)
if ref_match:
valid_refs.append(ref_match.group(1))
diff --git a/modules/workflows/processing/shared/promptGenerationTaskplan.py b/modules/workflows/processing/shared/promptGenerationTaskplan.py
index 08f18493..f5fb3407 100644
--- a/modules/workflows/processing/shared/promptGenerationTaskplan.py
+++ b/modules/workflows/processing/shared/promptGenerationTaskplan.py
@@ -28,6 +28,8 @@ def generateTaskPlanningPrompt(services, context: Any) -> PromptBundle:
Break down user requests into logical, executable task steps.
+**IMPORTANT**: If the user asks for ONE complete business objective, create ONLY ONE task that accomplishes the entire objective. Do NOT split it into multiple micro-tasks.
+
## 📋 Context
### User Request
@@ -46,12 +48,20 @@ Break down user requests into logical, executable task steps.
- **ONE TOPIC PER TASK** - Each task should handle one complete business objective
- **HIGH-LEVEL FOCUS** - Plan strategic outcomes, not implementation steps
- **AVOID MICRO-TASKS** - Don't create separate tasks for each small action
+- **CRITICAL**: If the user asks for ONE thing (like "analyse document list and produce summary"), create ONLY ONE task that does the complete job
### Task Grouping Examples
- **Research + Analysis + Report** → ONE task: "Web research report"
- **Data Collection + Processing + Visualization** → ONE task: "Collect and present data"
+- **Document splitting** (analyze + extract + create files) → ONE task: "Split document into separate files"
- **Different topics** (email + flowers) → SEPARATE tasks: "Send formal email..." + "Order flowers from Fleurop for delivery to 123 Main St, include card message"
+### Common Single-Task Scenarios
+- **"Split document into sections"** → ONE task: "Split document into separate files"
+- **"Extract data and create report"** → ONE task: "Extract data and create report"
+- **"Analyze and summarize document"** → ONE task: "Analyze and summarize document"
+- **"Convert file to different format"** → ONE task: "Convert file to different format"
+
### Retry Handling
- **If retry request**: Analyze previous rounds to understand what failed
- **Learn from mistakes**: Improve the plan based on previous failures