From bdc87eb5c67613fc53ea07a3b42c9f51977f871d Mon Sep 17 00:00:00 2001
From: ValueOn AG
Date: Tue, 14 Oct 2025 17:50:27 +0200
Subject: [PATCH] added normalization service to merge structured data of any
format
---
modules/datamodels/datamodelDocument.py | 9 +-
.../serviceAi/subDocumentGeneration.py | 80 ++++++
.../serviceAi/subDocumentProcessing.py | 41 ++-
.../renderers/rendererBaseTemplate.py | 8 +-
.../renderers/rendererDocx.py | 36 +--
.../renderers/rendererPdf.py | 39 +--
.../renderers/rendererPptx.py | 78 +++---
.../renderers/rendererXlsx.py | 34 ++-
.../serviceGeneration/subPromptBuilder.py | 105 ++++----
.../mainServiceNormalization.py | 243 ++++++++++++++++++
.../processing/shared/placeholderFactory.py | 37 ++-
modules/workflows/workflowManager.py | 39 ++-
12 files changed, 585 insertions(+), 164 deletions(-)
create mode 100644 modules/services/serviceNormalization/mainServiceNormalization.py
diff --git a/modules/datamodels/datamodelDocument.py b/modules/datamodels/datamodelDocument.py
index a437b6f1..4c37c106 100644
--- a/modules/datamodels/datamodelDocument.py
+++ b/modules/datamodels/datamodelDocument.py
@@ -121,5 +121,10 @@ class JsonMergeResult(BaseModel):
metadata: Dict[str, Any] = Field(default_factory=dict, description="Merge process metadata")
-# Update forward references
-ListItem.model_rebuild()
+# Update forward references (compatible with Pydantic v1 and v2)
+try:
+ # Pydantic v2
+ ListItem.model_rebuild()
+except AttributeError:
+ # Pydantic v1
+ ListItem.update_forward_refs()
diff --git a/modules/services/serviceAi/subDocumentGeneration.py b/modules/services/serviceAi/subDocumentGeneration.py
index 750616e4..d9318f00 100644
--- a/modules/services/serviceAi/subDocumentGeneration.py
+++ b/modules/services/serviceAi/subDocumentGeneration.py
@@ -105,6 +105,12 @@ class SubDocumentGeneration:
if not isinstance(aiResponseJson, dict) or "sections" not in aiResponseJson:
raise Exception("AI response is not valid JSON document structure")
+ # Emit raw extracted data as a chat message attachment before rendering
+ try:
+ await self._postRawDataChatMessage(aiResponseJson, label="raw_extraction_single")
+ except Exception:
+ logger.warning("Failed to emit raw extraction chat message (single-file)")
+
# Generate filename from document metadata
parsedFilename = None
try:
@@ -211,6 +217,12 @@ class SubDocumentGeneration:
prompt, documents, options, outputFormat, title
)
+ # Emit raw extracted data as a chat message attachment before transformation/rendering
+ try:
+ await self._postRawDataChatMessage(ai_response, label="raw_extraction_multi")
+ except Exception:
+ logger.warning("Failed to emit raw extraction chat message (multi-file)")
+
# Process multiple documents
generated_documents = []
for i, doc_data in enumerate(ai_response.get("documents", [])):
@@ -457,3 +469,71 @@ Return only the JSON response.
except Exception as e:
logger.warning(f"Response validation failed with exception: {str(e)}")
return False
+
+ async def _postRawDataChatMessage(self, payload: Any, label: str = "raw_extraction") -> None:
+ """
+ Create a ChatMessage with the extracted raw JSON attached as a file so the user
+ has access to the data even if downstream processing fails.
+ """
+ try:
+ services = self.services
+ workflow = getattr(services, 'currentWorkflow', None)
+ if not workflow:
+ return
+
+ # Serialize payload
+ import json as _json
+ from datetime import datetime, UTC
+ ts = datetime.now(UTC).strftime("%Y%m%d-%H%M%S")
+ content_text = _json.dumps(payload, ensure_ascii=False, indent=2)
+ content_bytes = content_text.encode('utf-8')
+
+ # Store as file via component storage
+ file_name = f"{label}_{ts}.json"
+ file_item = services.interfaceDbComponent.createFile(
+ name=file_name,
+ mimeType="application/json",
+ content=content_bytes
+ )
+ services.interfaceDbComponent.createFileData(file_item.id, content_bytes)
+
+ # Lookup file info for ChatDocument
+ file_info = services.workflow.getFileInfo(file_item.id)
+ doc = ChatDocument(
+ messageId="", # set after message creation
+ fileId=file_item.id,
+ fileName=file_info.get("fileName", file_name) if file_info else file_name,
+ fileSize=file_info.get("size", len(content_bytes)) if file_info else len(content_bytes),
+ mimeType=file_info.get("mimeType", "application/json") if file_info else "application/json"
+ )
+
+ # Create message referencing the file
+ messageData = {
+ "workflowId": workflow.id,
+ "role": "assistant",
+ "message": "Raw extraction data saved",
+ "status": "data",
+ "sequenceNr": len(getattr(workflow, 'messages', []) or []) + 1,
+ "publishedAt": services.utils.getUtcTimestamp(),
+ "documentsLabel": label,
+ "documents": []
+ }
+ message = services.workflow.createMessage(messageData)
+ if not message:
+ return
+
+ # Persist ChatDocument with messageId
+ doc.messageId = message.id
+ services.interfaceDbChat.createDocument(doc.to_dict())
+
+ # Update message to include document
+ try:
+ if not message.documents:
+ message.documents = []
+ message.documents.append(doc)
+ services.workflow.updateMessage(message.id, {"documents": [d.to_dict() for d in message.documents]})
+ except Exception:
+ pass
+ except Exception:
+ # Non-fatal; ignore if storage or chat creation fails
+ return
diff --git a/modules/services/serviceAi/subDocumentProcessing.py b/modules/services/serviceAi/subDocumentProcessing.py
index f86be535..de3a0f2f 100644
--- a/modules/services/serviceAi/subDocumentProcessing.py
+++ b/modules/services/serviceAi/subDocumentProcessing.py
@@ -175,6 +175,27 @@ class SubDocumentProcessing:
# Merge with JSON mode
mergedJsonDocument = self._mergeChunkResultsJson(chunkResults, options)
+
+ # Normalize merged JSON into a single canonical table
+ try:
+ from modules.services.serviceNormalization.mainServiceNormalization import NormalizationService
+ normalizer = NormalizationService(self.services)
+ inventory = normalizer.discoverStructures(mergedJsonDocument)
+ # Use workflow id if available as cache key, else default
+ cacheKey = getattr(self.services, 'currentWorkflow', None)
+ cacheKey = getattr(cacheKey, 'id', 'workflow_run') if cacheKey else 'workflow_run'
+ # Provide the extraction/merge prompt context when available to help mapping
+ mergePrompt = prompt
+ mapping = await normalizer.requestHeaderMapping(inventory, cacheKey, None, mergePrompt)
+ canonical = normalizer.applyMapping(mergedJsonDocument, mapping)
+ report = normalizer.validateCanonical(canonical)
+ if report.get('success'):
+ mergedJsonDocument = canonical
+ else:
+ raise ValueError('Normalization produced zero rows')
+ except Exception as e:
+ # Surface normalization failure while leaving original merged JSON (single-path expectation is to fail)
+ raise
# Save merged JSON extraction content to debug file - only if debug enabled
try:
@@ -481,8 +502,16 @@ class SubDocumentProcessing:
self.services.utils.debugLogToFile(f"EXTRACTION PROMPT: {prompt}", "AI_SERVICE")
self.services.utils.debugLogToFile(f"EXTRACTION CONTEXT LENGTH: {len(part.data) if part.data else 0} characters", "AI_SERVICE")
+ # Strengthen prompt to forbid fabrication for text/container extraction
+ augmented_prompt = (
+ f"{prompt}\n\n"
+ "CRITICAL RULES (NO FABRICATION):\n"
+ "- Use ONLY content present in the provided CONTEXT.\n"
+ "- Do NOT create, infer, or guess values not explicitly in the context.\n"
+ "- If a value is missing, leave the cell empty or omit the row.\n"
+ )
request = AiCallRequest(
- prompt=prompt,
+ prompt=augmented_prompt,
context=part.data,
options=request_options
)
@@ -579,8 +608,16 @@ class SubDocumentProcessing:
logger.debug(f"AI PROMPT PREVIEW: {prompt[:300]}...")
logger.debug(f"AI CONTEXT PREVIEW: {part.data[:200] if part.data else 'None'}...")
+ # Strengthen prompt to forbid fabrication for text extraction
+ augmented_prompt_text = (
+ f"{prompt}\n\n"
+ "CRITICAL RULES (NO FABRICATION):\n"
+ "- Use ONLY content present in the provided CONTEXT.\n"
+ "- Do NOT create, infer, or guess values not explicitly in the context.\n"
+ "- If a value is missing, leave the cell empty or omit the row.\n"
+ )
request = AiCallRequest(
- prompt=prompt,
+ prompt=augmented_prompt_text,
context=part.data,
options=request_options
)
diff --git a/modules/services/serviceGeneration/renderers/rendererBaseTemplate.py b/modules/services/serviceGeneration/renderers/rendererBaseTemplate.py
index ed10c01b..b8158201 100644
--- a/modules/services/serviceGeneration/renderers/rendererBaseTemplate.py
+++ b/modules/services/serviceGeneration/renderers/rendererBaseTemplate.py
@@ -92,11 +92,11 @@ class BaseRenderer(ABC):
def _get_section_type(self, section: Dict[str, Any]) -> str:
"""Get the type of a section."""
- return section.get("type", "paragraph")
+ return section.get("content_type", "paragraph")
- def _get_section_data(self, section: Dict[str, Any]) -> Dict[str, Any]:
- """Get the data of a section."""
- return section.get("data", {})
+ def _get_section_data(self, section: Dict[str, Any]) -> List[Dict[str, Any]]:
+ """Get the elements of a section."""
+ return section.get("elements", [])
def _get_section_id(self, section: Dict[str, Any]) -> str:
"""Get the ID of a section (if available)."""
diff --git a/modules/services/serviceGeneration/renderers/rendererDocx.py b/modules/services/serviceGeneration/renderers/rendererDocx.py
index 0e0417f9..d744b7e5 100644
--- a/modules/services/serviceGeneration/renderers/rendererDocx.py
+++ b/modules/services/serviceGeneration/renderers/rendererDocx.py
@@ -212,24 +212,26 @@ class RendererDocx(BaseRenderer):
def _render_json_section(self, doc: Document, section: Dict[str, Any], styles: Dict[str, Any]) -> None:
"""Render a single JSON section to DOCX using AI-generated styles."""
try:
- section_type = section.get("type", "paragraph")
- section_data = section.get("data", {})
+ section_type = section.get("content_type", "paragraph")
+ elements = section.get("elements", [])
- if section_type == "table":
- self._render_json_table(doc, section_data, styles)
- elif section_type == "bullet_list":
- self._render_json_bullet_list(doc, section_data, styles)
- elif section_type == "heading":
- self._render_json_heading(doc, section_data, styles)
- elif section_type == "paragraph":
- self._render_json_paragraph(doc, section_data, styles)
- elif section_type == "code_block":
- self._render_json_code_block(doc, section_data, styles)
- elif section_type == "image":
- self._render_json_image(doc, section_data, styles)
- else:
- # Fallback to paragraph for unknown types
- self._render_json_paragraph(doc, section_data, styles)
+ # Process each element in the section
+ for element in elements:
+ if section_type == "table":
+ self._render_json_table(doc, element, styles)
+ elif section_type == "bullet_list":
+ self._render_json_bullet_list(doc, element, styles)
+ elif section_type == "heading":
+ self._render_json_heading(doc, element, styles)
+ elif section_type == "paragraph":
+ self._render_json_paragraph(doc, element, styles)
+ elif section_type == "code_block":
+ self._render_json_code_block(doc, element, styles)
+ elif section_type == "image":
+ self._render_json_image(doc, element, styles)
+ else:
+ # Fallback to paragraph for unknown types
+ self._render_json_paragraph(doc, element, styles)
except Exception as e:
self.logger.warning(f"Error rendering section {section.get('id', 'unknown')}: {str(e)}")
diff --git a/modules/services/serviceGeneration/renderers/rendererPdf.py b/modules/services/serviceGeneration/renderers/rendererPdf.py
index 8dd1917d..dc3195ae 100644
--- a/modules/services/serviceGeneration/renderers/rendererPdf.py
+++ b/modules/services/serviceGeneration/renderers/rendererPdf.py
@@ -105,7 +105,7 @@ class RendererPdf(BaseRenderer):
sections = json_content.get("sections", [])
self.services.utils.debugLogToFile(f"PDF SECTIONS TO PROCESS: {len(sections)} sections", "PDF_RENDERER")
for i, section in enumerate(sections):
- self.services.utils.debugLogToFile(f"PDF SECTION {i}: type={section.get('type', 'unknown')}, id={section.get('id', 'unknown')}", "PDF_RENDERER")
+ self.services.utils.debugLogToFile(f"PDF SECTION {i}: content_type={section.get('content_type', 'unknown')}, id={section.get('id', 'unknown')}", "PDF_RENDERER")
section_elements = self._render_json_section(section, styles)
self.services.utils.debugLogToFile(f"PDF SECTION {i} ELEMENTS: {len(section_elements)} elements", "PDF_RENDERER")
story.extend(section_elements)
@@ -469,23 +469,28 @@ class RendererPdf(BaseRenderer):
"""Render a single JSON section to PDF elements using AI-generated styles."""
try:
section_type = self._get_section_type(section)
- section_data = self._get_section_data(section)
+ elements = self._get_section_data(section)
- if section_type == "table":
- return self._render_json_table(section_data, styles)
- elif section_type == "bullet_list":
- return self._render_json_bullet_list(section_data, styles)
- elif section_type == "heading":
- return self._render_json_heading(section_data, styles)
- elif section_type == "paragraph":
- return self._render_json_paragraph(section_data, styles)
- elif section_type == "code_block":
- return self._render_json_code_block(section_data, styles)
- elif section_type == "image":
- return self._render_json_image(section_data, styles)
- else:
- # Fallback to paragraph for unknown types
- return self._render_json_paragraph(section_data, styles)
+ # Process each element in the section
+ all_elements = []
+ for element in elements:
+ if section_type == "table":
+ all_elements.extend(self._render_json_table(element, styles))
+ elif section_type == "bullet_list":
+ all_elements.extend(self._render_json_bullet_list(element, styles))
+ elif section_type == "heading":
+ all_elements.extend(self._render_json_heading(element, styles))
+ elif section_type == "paragraph":
+ all_elements.extend(self._render_json_paragraph(element, styles))
+ elif section_type == "code_block":
+ all_elements.extend(self._render_json_code_block(element, styles))
+ elif section_type == "image":
+ all_elements.extend(self._render_json_image(element, styles))
+ else:
+ # Fallback to paragraph for unknown types
+ all_elements.extend(self._render_json_paragraph(element, styles))
+
+ return all_elements
except Exception as e:
self.logger.warning(f"Error rendering section {self._get_section_id(section)}: {str(e)}")
diff --git a/modules/services/serviceGeneration/renderers/rendererPptx.py b/modules/services/serviceGeneration/renderers/rendererPptx.py
index 2ac9cd11..26c707ca 100644
--- a/modules/services/serviceGeneration/renderers/rendererPptx.py
+++ b/modules/services/serviceGeneration/renderers/rendererPptx.py
@@ -600,29 +600,33 @@ JSON ONLY. NO OTHER TEXT."""
try:
# Get section title from data or use default
section_title = "Untitled Section"
- if section.get("type") == "heading":
- section_title = section.get("data", {}).get("text", "Untitled Section")
+ if section.get("content_type") == "heading":
+ # Extract text from elements array
+ for element in section.get("elements", []):
+ if isinstance(element, dict) and "text" in element:
+ section_title = element.get("text", "Untitled Section")
+ break
elif section.get("title"):
section_title = section.get("title")
- content_type = section.get("type", "paragraph")
- section_data = section.get("data", {})
+ content_type = section.get("content_type", "paragraph")
+ elements = section.get("elements", [])
# Build slide content based on section type
content_parts = []
if content_type == "table":
- content_parts.append(self._format_table_for_slide(section_data))
+ content_parts.append(self._format_table_for_slide(elements))
elif content_type == "list":
- content_parts.append(self._format_list_for_slide(section_data))
+ content_parts.append(self._format_list_for_slide(elements))
elif content_type == "heading":
- content_parts.append(self._format_heading_for_slide(section_data))
+ content_parts.append(self._format_heading_for_slide(elements))
elif content_type == "paragraph":
- content_parts.append(self._format_paragraph_for_slide(section_data))
+ content_parts.append(self._format_paragraph_for_slide(elements))
elif content_type == "code":
- content_parts.append(self._format_code_for_slide(section_data))
+ content_parts.append(self._format_code_for_slide(elements))
else:
- content_parts.append(self._format_paragraph_for_slide(section_data))
+ content_parts.append(self._format_paragraph_for_slide(elements))
# Combine content parts
slide_content = "\n\n".join(filter(None, content_parts))
@@ -636,11 +640,17 @@ JSON ONLY. NO OTHER TEXT."""
logger.warning(f"Error creating slide from section: {str(e)}")
return None
- def _format_table_for_slide(self, table_data: Dict[str, Any]) -> str:
+ def _format_table_for_slide(self, elements: List[Dict[str, Any]]) -> str:
"""Format table data for slide presentation."""
try:
- headers = table_data.get("headers", [])
- rows = table_data.get("rows", [])
+ # Extract table data from elements array
+ headers = []
+ rows = []
+ for element in elements:
+ if isinstance(element, dict) and "headers" in element and "rows" in element:
+ headers = element.get("headers", [])
+ rows = element.get("rows", [])
+ break
if not headers:
return ""
@@ -805,8 +815,8 @@ JSON ONLY. NO OTHER TEXT."""
current_slide_title = "Content Overview"
for section in sections:
- section_type = section.get("type", "paragraph")
- section_data = section.get("data", {})
+ section_type = section.get("content_type", "paragraph")
+ elements = section.get("elements", [])
if section_type == "heading":
# If we have accumulated content, create a slide
@@ -818,7 +828,10 @@ JSON ONLY. NO OTHER TEXT."""
current_slide_content = []
# Start new slide with heading as title
- current_slide_title = section_data.get("text", "Untitled Section")
+ for element in elements:
+ if isinstance(element, dict) and "text" in element:
+ current_slide_title = element.get("text", "Untitled Section")
+ break
else:
# Add content to current slide
formatted_content = self._format_section_content(section)
@@ -841,21 +854,26 @@ JSON ONLY. NO OTHER TEXT."""
def _format_section_content(self, section: Dict[str, Any]) -> str:
"""Format section content for slide presentation."""
try:
- content_type = section.get("type", "paragraph")
- section_data = section.get("data", {})
+ content_type = section.get("content_type", "paragraph")
+ elements = section.get("elements", [])
- if content_type == "table":
- return self._format_table_for_slide(section_data)
- elif content_type == "list":
- return self._format_list_for_slide(section_data)
- elif content_type == "heading":
- return self._format_heading_for_slide(section_data)
- elif content_type == "paragraph":
- return self._format_paragraph_for_slide(section_data)
- elif content_type == "code":
- return self._format_code_for_slide(section_data)
- else:
- return self._format_paragraph_for_slide(section_data)
+ # Process each element in the section
+ content_parts = []
+ for element in elements:
+ if content_type == "table":
+ content_parts.append(self._format_table_for_slide([element]))
+ elif content_type == "list":
+ content_parts.append(self._format_list_for_slide([element]))
+ elif content_type == "heading":
+ content_parts.append(self._format_heading_for_slide([element]))
+ elif content_type == "paragraph":
+ content_parts.append(self._format_paragraph_for_slide([element]))
+ elif content_type == "code":
+ content_parts.append(self._format_code_for_slide([element]))
+ else:
+ content_parts.append(self._format_paragraph_for_slide([element]))
+
+ return "\n\n".join(filter(None, content_parts))
except Exception as e:
logger.warning(f"Error formatting section content: {str(e)}")
diff --git a/modules/services/serviceGeneration/renderers/rendererXlsx.py b/modules/services/serviceGeneration/renderers/rendererXlsx.py
index 90487bcd..ed11dd92 100644
--- a/modules/services/serviceGeneration/renderers/rendererXlsx.py
+++ b/modules/services/serviceGeneration/renderers/rendererXlsx.py
@@ -457,7 +457,7 @@ class RendererXlsx(BaseRenderer):
# Add additional sheets for other content types
content_types = set()
for section in sections:
- content_type = section.get("type", "paragraph")
+ content_type = section.get("content_type", "paragraph")
content_types.add(content_type)
if "table" in content_types and len(table_sections) == 1:
@@ -658,25 +658,21 @@ class RendererXlsx(BaseRenderer):
start_row += 1
# Process section based on type
- section_type = section.get("type", "paragraph")
+ section_type = section.get("content_type", "paragraph")
- if section_type == "table":
- # Handle table section directly
- table_data = section.get("data", {})
- if table_data:
- start_row = self._add_table_to_excel(sheet, table_data, styles, start_row)
- else:
- # Handle other section types
- elements = section.get("elements", [])
- for element in elements:
- if section_type == "list":
- start_row = self._add_list_to_excel(sheet, element, styles, start_row)
- elif section_type == "paragraph":
- start_row = self._add_paragraph_to_excel(sheet, element, styles, start_row)
- elif section_type == "heading":
- start_row = self._add_heading_to_excel(sheet, element, styles, start_row)
- else:
- start_row = self._add_paragraph_to_excel(sheet, element, styles, start_row)
+ # Handle all section types using elements array
+ elements = section.get("elements", [])
+ for element in elements:
+ if section_type == "table":
+ start_row = self._add_table_to_excel(sheet, element, styles, start_row)
+ elif section_type == "list":
+ start_row = self._add_list_to_excel(sheet, element, styles, start_row)
+ elif section_type == "paragraph":
+ start_row = self._add_paragraph_to_excel(sheet, element, styles, start_row)
+ elif section_type == "heading":
+ start_row = self._add_heading_to_excel(sheet, element, styles, start_row)
+ else:
+ start_row = self._add_paragraph_to_excel(sheet, element, styles, start_row)
return start_row
diff --git a/modules/services/serviceGeneration/subPromptBuilder.py b/modules/services/serviceGeneration/subPromptBuilder.py
index 7b7342bc..e0faa029 100644
--- a/modules/services/serviceGeneration/subPromptBuilder.py
+++ b/modules/services/serviceGeneration/subPromptBuilder.py
@@ -286,7 +286,7 @@ async def buildExtractionPrompt(
from .subJsonSchema import get_document_subJsonSchema
jsonSchema = get_document_subJsonSchema()
- # Generic block for JSON extraction
+ # Generic block for JSON extraction - use proper schema instead of hardcoded template
genericIntro = f"""
{extractionIntent}
@@ -295,44 +295,7 @@ You are extracting structured content from documents and must respond with valid
CRITICAL: You must respond with valid JSON only. No additional text, explanations, markdown formatting, code blocks, or any other content outside the JSON structure. Do not use ``` markers or any other formatting.
Extract the actual data from the source documents and structure it as JSON with this format:
-{{
- "metadata": {{
- "title": "Document Title",
- "version": "1.0"
- }},
- "sections": [
- {{
- "id": "section_1",
- "type": "heading",
- "data": {{
- "level": 1,
- "text": "Heading Text"
- }}
- }},
- {{
- "id": "section_2",
- "type": "table",
- "data": {{
- "headers": ["Column1", "Column2"],
- "rows": [["Data1", "Data2"], ["Data3", "Data4"]]
- }}
- }},
- {{
- "id": "section_3",
- "type": "bullet_list",
- "data": {{
- "items": ["Item 1", "Item 2", "Item 3"]
- }}
- }},
- {{
- "id": "section_4",
- "type": "paragraph",
- "data": {{
- "text": "Paragraph content here"
- }}
- }}
- ]
-}}
+{json.dumps(jsonSchema, indent=2)}
Content Types to Extract:
1. Tables: Extract all rows and columns with proper headers
@@ -485,8 +448,8 @@ def _getFormatRules(outputFormat: str) -> str:
async def _parseExtractionIntent(userPrompt: str, outputFormat: str, aiService=None, services=None) -> str:
"""
- Use AI to extract the core content intention from the user prompt.
- Focus on WHAT the user wants to extract, not HOW to format it.
+ Use AI to extract a rich, structured extraction intent from the user prompt.
+ Include language, normalization, structure needs, headers, formats, row strategy, and multi-file guidance.
"""
if not aiService:
# Fallback if no AI service available
@@ -496,18 +459,46 @@ async def _parseExtractionIntent(userPrompt: str, outputFormat: str, aiService=N
# Protect userPrompt from injection by escaping quotes and newlines
safeUserPrompt = userPrompt.replace('"', '\\"').replace("'", "\\'").replace('\n', ' ').replace('\r', ' ')
- # Simple AI call to extract the intention
+ # Rich analysis to derive a complete extraction intent and structure guidance
extractionPrompt = f"""
-Extract the core content intention from this user request. Focus on WHAT raw data/content they want extracted.
+Analyze the user's request and produce a RICH extraction intent. Return ONLY JSON.
+
+Goals:
+- Detect language and normalize the request into a full, explicit instruction (no summary; preserve all constraints and details).
+- Decide if structured data is required; if so, define the target structure precisely (headers, order, formats, row strategy).
+- Identify if multi-file output is appropriate and how to split/files name.
User request: "{safeUserPrompt}"
-Return only the content intention in a simple format like "Extract: [content description]"
-Focus on extracting raw data, tables, lists, and factual content - NOT summaries or analysis.
-If the user mentions a table, extract the actual table data with rows and columns.
-If the user mentions a list, extract the actual list items.
-IMPORTANT: Preserve any language requirements in your response.
-Do not include formatting instructions, file types, or output methods.
+Return JSON in this exact shape:
+{{
+ "detectedLanguage": "de|en|fr|it|...",
+ "normalizedRequest": "Full explicit instruction in detected language",
+ "requiresStructuredData": true|false,
+ "targetStructure": "table|list|mixed|unstructured",
+ "table": {{
+ "headers": ["Header1", "Header2", "..."],
+ "headerOrderStrict": true|false,
+ "rowStrategy": "one_row_per_document|one_row_per_entity|one_row_per_vat_rate|custom",
+ "formats": {{
+ "dateFormat": "DD.MM.YYYY|YYYY-MM-DD|...",
+ "amountDecimals": 2,
+ "currencyFormat": "code|symbol",
+ "idMasking": "none|last4|custom"
+ }}
+ }},
+ "multiFile": true|false,
+ "fileSplitStrategy": "single|per_entity|by_section|by_criteria|custom",
+ "fileNamingPattern": "suggested pattern for filenames",
+ "constraints": ["List of critical constraints to enforce"],
+ "reasoning": "Brief justification (one sentence)"
+}}
+
+Rules:
+- Preserve user terminology and language in normalizedRequest.
+- If the user listed columns/fields, copy them exactly into table.headers and set headerOrderStrict=true.
+- If the user implies separate rows for rates/entities, set an appropriate rowStrategy (e.g., one_row_per_vat_rate).
+- If no structure is required, set requiresStructuredData=false and targetStructure="unstructured".
"""
# Call AI service to extract intention
@@ -523,12 +514,24 @@ Do not include formatting instructions, file types, or output methods.
result = response.content if response else ""
services.utils.debugLogToFile(f"DEBUG: Extraction intent processed", "PROMPT_BUILDER")
- return result if result else f"Extract all relevant content from the document according to the user's requirements: {userPrompt}"
+ # Try to extract and pretty print JSON
+ if result:
+ import re, json as _json
+ match = re.search(r'\{[\s\S]*\}', result)
+ if match:
+ try:
+ obj = _json.loads(match.group(0))
+ return _json.dumps(obj, ensure_ascii=False, indent=2)
+ except Exception:
+ pass
+
+ # Fallback to previous simple format
+ return f"Extract: {safeUserPrompt}"
except Exception as e:
# Fallback on any error - preserve user prompt for language instructions
services.utils.debugLogToFile(f"DEBUG: AI extraction intent failed: {str(e)}", "PROMPT_BUILDER")
- return f"Extract all relevant content from the document according to the user's requirements: {userPrompt}"
+ return f"Extract: {userPrompt}"
diff --git a/modules/services/serviceNormalization/mainServiceNormalization.py b/modules/services/serviceNormalization/mainServiceNormalization.py
new file mode 100644
index 00000000..deb93351
--- /dev/null
+++ b/modules/services/serviceNormalization/mainServiceNormalization.py
@@ -0,0 +1,243 @@
+import json
+import os
+from typing import Any, Dict, List, Set
+from datetime import datetime, UTC
+
+
+class NormalizationService:
+ """
+ Produces a single canonical table in merged JSON using an AI-provided header mapping
+ and deterministic, in-code value normalization. No language heuristics in code.
+ """
+
+ def __init__(self, services):
+ self.services = services
+
+ # Public API
+ def discoverStructures(self, mergedJson: Dict[str, Any]) -> Dict[str, Any]:
+ headers: Set[str] = set()
+ samples: Dict[str, List[str]] = {}
+
+ sections = mergedJson.get("sections", []) if isinstance(mergedJson, dict) else []
+ for section in sections:
+ if not isinstance(section, dict):
+ continue
+
+ # Use only the fundamental agreed JSON structure: content_type/elements
+ if section.get("content_type") != "table":
+ continue
+
+ # Extract table data from elements array
+ for element in section.get("elements", []):
+ if isinstance(element, dict) and "headers" in element and "rows" in element:
+ hdrs = element.get("headers") or []
+ rows = element.get("rows") or []
+ break
+ else:
+ continue
+ for h in hdrs:
+ if not isinstance(h, str):
+ continue
+ headers.add(h)
+ # collect small value samples by column index
+ for row in rows[:5]:
+ if not isinstance(row, list):
+ continue
+ for i, value in enumerate(row):
+ headerName = hdrs[i] if i < len(hdrs) else f"col_{i}"
+ if headerName not in samples:
+ samples[headerName] = []
+ if len(samples[headerName]) < 5:
+ samples[headerName].append(str(value))
+
+ return {
+ "tableHeaders": sorted(list(headers)),
+ "headerSamples": samples,
+ }
+
+ async def requestHeaderMapping(self, inventory: Dict[str, Any], cacheKey: str, canonicalSpec: Dict[str, Any] | None = None, mergePrompt: str | None = None) -> Dict[str, Any]:
+
+ # Allow caller to specify any canonical schema. If none provided, default to discovered headers.
+ if canonicalSpec is None:
+ canonicalSpec = {
+ "canonicalHeaders": inventory.get("tableHeaders", []),
+ "constraints": {}
+ }
+
+ # Protect merge prompt context by wrapping in single quotes and escaping internal quotes
+ protectedMerge = None
+ if mergePrompt:
+ try:
+ protectedMerge = str(mergePrompt).replace("'", "\\'")
+ except Exception:
+ protectedMerge = str(mergePrompt)
+
+ prompt = (
+ "You are a mapping generator. Return ONLY JSON.\n\n"
+ "Given discovered headers and sample values, map them to the canonical headers.\n"
+ "Do not invent fields. Use null if no mapping. Provide normalization policy.\n\n"
+ f"CANONICAL_SPEC:\n{json.dumps(canonicalSpec, ensure_ascii=False, indent=2)}\n\n"
+ f"HEADERS_DISCOVERED:\n{json.dumps(inventory, ensure_ascii=False, indent=2)}\n\n"
+ + (f"MERGE_PROMPT_CONTEXT (protected):\n'{protectedMerge}'\n\n" if protectedMerge is not None else "") +
+ "REPLY JSON SHAPE:\n(Example)\n"
+ "{\n \"mappings\": {\"\": \"|null\"},\n"
+ " \"normalizationPolicy\": {\n \"TotalAmount\": {\"decimalSeparator\": \",\"|\".\"},\n"
+ " \"Currency\": {\"stripSymbols\": true},\n"
+ " \"Date\": {\"formats\": [\"DD.MM.YYYY\",\"YYYY-MM-DD\"]}\n }\n}\n"
+ )
+
+ response = await self.services.ai.callAi(prompt=prompt)
+ js = response[response.find('{'):response.rfind('}') + 1] if response else '{}'
+ mapping = json.loads(js)
+ # Normalize key naming from AI: prefer single key "mapping"
+ if "mapping" not in mapping and "mappings" in mapping and isinstance(mapping["mappings"], dict):
+ mapping["mapping"] = mapping["mappings"]
+ try:
+ del mapping["mappings"]
+ except Exception:
+ pass
+ # Ensure canonicalHeaders present in mapping for downstream use
+ if "canonicalHeaders" not in mapping:
+ mapping["canonicalHeaders"] = canonicalSpec.get("canonicalHeaders", [])
+
+ # debug artifact
+ self._writeDebugArtifact("mapping.json", mapping)
+ return mapping
+
+ def applyMapping(self, mergedJson: Dict[str, Any], mappingSpec: Dict[str, Any]) -> Dict[str, Any]:
+ mappings = (mappingSpec or {}).get("mapping", {})
+ policy = (mappingSpec or {}).get("normalizationPolicy", {})
+
+ # Prefer headers provided by mapping (generic across domains)
+ canonicalHeaders = (mappingSpec or {}).get("canonicalHeaders") or []
+ if not canonicalHeaders:
+ # Fallback to union of mapped targets
+ canonicalHeaders = sorted(list({t for t in mappings.values() if t}))
+
+ rows: List[List[str]] = []
+ sections = mergedJson.get("sections", []) if isinstance(mergedJson, dict) else []
+ for section in sections:
+ # Use only the fundamental agreed JSON structure: content_type/elements
+ if section.get("content_type") != "table":
+ continue
+
+ # Extract table data from elements array
+ for element in section.get("elements", []):
+ if isinstance(element, dict) and "headers" in element and "rows" in element:
+ sourceHeaders = element.get("headers") or []
+ sourceRows = element.get("rows") or []
+ break
+ else:
+ continue
+ if not sourceHeaders or not sourceRows:
+ continue
+
+ # Build index map: canonical -> source index or None
+ indexMap: Dict[str, int] = {}
+ for ci, ch in enumerate(canonicalHeaders):
+ srcIndex = None
+ for si, sh in enumerate(sourceHeaders):
+ # Prefer explicit mapping target; fallback to identity when names match
+ target = mappings.get(sh)
+ if target is None and sh == ch:
+ target = ch
+ if target == ch:
+ srcIndex = si
+ break
+ indexMap[ch] = srcIndex
+
+ # Transform rows
+ for r in sourceRows:
+ canonicalRow: List[str] = []
+ for ch in canonicalHeaders:
+ idx = indexMap.get(ch)
+ value = r[idx] if (idx is not None and idx < len(r)) else ""
+ canonicalRow.append(self._normalizeValue(ch, value, policy))
+ # consider as row if at least one non-empty meaningful field
+ if any(v.strip() for v in canonicalRow):
+ rows.append(canonicalRow)
+
+ canonical = {
+ "metadata": {
+ "title": mergedJson.get("metadata", {}).get("title", "Merged Document"),
+ "source_documents": mergedJson.get("metadata", {}).get("source_documents", [])
+ },
+ "sections": [
+ {
+ "id": "canonical_table_1",
+ "content_type": "table",
+ "elements": [
+ {
+ "headers": canonicalHeaders,
+ "rows": rows
+ }
+ ],
+ "order": 1
+ }
+ ]
+ }
+
+ # debug artifact
+ self._writeDebugArtifact("canonical_merged.json", canonical)
+ return canonical
+
+ def validateCanonical(self, canonicalJson: Dict[str, Any]) -> Dict[str, Any]:
+ rows = []
+ try:
+ sections = canonicalJson.get("sections", [])
+ for s in sections:
+ if s.get("content_type") == "table":
+ # Extract rows from elements array
+ for element in s.get("elements", []):
+ if isinstance(element, dict) and "rows" in element:
+ rows.extend(element.get("rows", []))
+ except Exception:
+ rows = []
+ report = {
+ "rowCount": len(rows),
+ "success": len(rows) > 0
+ }
+ self._writeDebugArtifact("normalization_report.json", report)
+ return report
+
+ # Internal helpers
+ def _normalizeValue(self, canonicalHeader: str, value: Any, policy: Dict[str, Any]) -> str:
+ if value is None:
+ return ""
+ text = str(value).strip()
+ # Generic normalization guided by policy; avoid domain specifics
+ if canonicalHeader in (policy.get("numericFields", []) or []):
+ dec = ((policy.get(canonicalHeader) or {}).get("decimalSeparator")
+ or (policy.get("numeric") or {}).get("decimalSeparator")
+ or ".")
+ if dec == ",":
+ text = text.replace(".", "").replace(",", ".") if "," in text else text
+ text = ''.join(ch for ch in text if ch.isdigit() or ch in ['.', '-', '+'])
+ elif (policy.get("text") or {}).get("stripSymbols") and canonicalHeader in (policy.get("text", {}).get("applyTo", []) or []):
+ text = ''.join(ch for ch in text if ch.isalpha())
+ text = text.upper()
+ return text
+
+ def _writeDebugArtifact(self, fileName: str, obj: Any) -> None:
+ try:
+ debugEnabled = self.services.utils.configGet("APP_DEBUG_CHAT_WORKFLOW_ENABLED", False)
+ if not debugEnabled:
+ return
+ root = "./test-chat/ai"
+ os.makedirs(root, exist_ok=True)
+ # Prefix timestamp for files that are frequently overwritten
+ ts = datetime.now(UTC).strftime("%Y%m%d-%H%M%S")
+ if fileName in ("mapping.json", "canonical_merged.json"):
+ outName = f"{ts}_{fileName}"
+ else:
+ outName = fileName
+ path = os.path.join(root, outName)
+ with open(path, "w", encoding="utf-8") as f:
+ if isinstance(obj, (dict, list)):
+ f.write(json.dumps(obj, ensure_ascii=False, indent=2))
+ else:
+ f.write(str(obj))
+ except Exception:
+ pass
+
+
diff --git a/modules/workflows/processing/shared/placeholderFactory.py b/modules/workflows/processing/shared/placeholderFactory.py
index 225edbe7..75e143f7 100644
--- a/modules/workflows/processing/shared/placeholderFactory.py
+++ b/modules/workflows/processing/shared/placeholderFactory.py
@@ -42,16 +42,27 @@ def extractUserPrompt(context: Any) -> str:
Fallback to the task_step objective.
"""
try:
- # Prefer services.currentUserPrompt when accessible through context
services = getattr(context, 'services', None)
- if services and getattr(services, 'currentUserPrompt', None):
- return services.currentUserPrompt
- except Exception:
- pass
- if hasattr(context, 'task_step') and context.task_step:
- return context.task_step.objective or 'No request specified'
- return 'No request specified'
+ # Determine raw user prompt from services or task_step
+ rawPrompt = None
+ if services and getattr(services, 'currentUserPrompt', None):
+ rawPrompt = services.currentUserPrompt
+ elif hasattr(context, 'task_step') and context.task_step:
+ rawPrompt = context.task_step.objective or 'No request specified'
+ else:
+ rawPrompt = 'No request specified'
+
+ # Prefer values computed at workflow start by WorkflowManager analyzer
+ normalized = getattr(services, 'currentUserPromptNormalized', None) if services else None
+ if normalized:
+ return normalized
+ return rawPrompt
+ except Exception:
+ # Robust fallback behavior
+ if hasattr(context, 'task_step') and context.task_step:
+ return context.task_step.objective or 'No request specified'
+ return 'No request specified'
def extractWorkflowHistory(service: Any, context: Any) -> str:
"""Extract workflow history from context. Maps to {{KEY:WORKFLOW_HISTORY}}
@@ -99,7 +110,15 @@ def extractAvailableMethods(service: Any) -> str:
def extractUserLanguage(service: Any) -> str:
"""Extract user language from service. Maps to {{KEY:USER_LANGUAGE}}"""
- return service.user.language if service and service.user else 'en'
+ try:
+ # Prefer detected language if available
+ if service and getattr(service, 'currentUserLanguage', None):
+ return service.currentUserLanguage
+ return service.user.language if service and service.user else 'en'
+ except Exception:
+ return 'en'
+
+# Normalization now happens centrally in WorkflowManager._sendFirstMessage; no AI call here.
def _computeMessageSummary(msg) -> str:
diff --git a/modules/workflows/workflowManager.py b/modules/workflows/workflowManager.py
index 5a3098b1..7b52ffe9 100644
--- a/modules/workflows/workflowManager.py
+++ b/modules/workflows/workflowManager.py
@@ -216,23 +216,23 @@ class WorkflowManager:
# Update the message with documents in database
self.services.workflow.updateMessage(message.id, {"documents": [doc.to_dict() for doc in documents]})
- # Analyze the user's input to extract intent and offload bulky context into documents
+ # Analyze the user's input to detect language, normalize request, extract intent, and offload bulky context into documents
try:
analyzerPrompt = (
- "You are an input analyzer. Split the user's message into:\n"
- "1) intent: the user's core request in one concise paragraph, normalized to the user's language.\n"
- "2) contextItems: supportive data to attach as separate documents if significantly larger than the intent. "
- "Include large literal data blocks, long lists/tables, code/JSON blocks, quoted transcripts, CSV fragments, or detailed specs. "
- "Keep URLs in the intent unless they include large pasted content.\n\n"
+ "You are an input analyzer. From the user's message, perform ALL of the following in one pass:\n"
+ "1) detectedLanguage: detect ISO 639-1 language code (e.g., de, en).\n"
+ "2) normalizedRequest: full, explicit restatement of the user's request in the detected language; do NOT summarize; preserve ALL constraints and details.\n"
+ "3) intent: concise single-paragraph core request in the detected language for high-level routing.\n"
+ "4) contextItems: supportive data blocks to attach as separate documents if significantly larger than the intent (large literal content, long lists/tables, code/JSON blocks, transcripts, CSV fragments, detailed specs). Keep URLs in the intent unless they embed large pasted content.\n\n"
"Rules:\n"
- "- If total content length (intent + data) is less than 10% of the model's max tokens, do not extract; "
- "return an empty contextItems and keep a compact, self-contained intent.\n"
- "- If content exceeds that, move bulky parts into contextItems, keeping the intent short and clear.\n"
- "- Preserve critical references (URLs, filenames) in the intent.\n"
- "- Normalize the intent to the detected language. If mixed-language, use the primary detected language and normalize.\n\n"
- "Output JSON only (no markdown):\n"
+ "- If total content (intent + data) is < 10% of model max tokens, do not extract; return empty contextItems and keep intent compact and self-contained.\n"
+ "- If content exceeds that threshold, move bulky parts into contextItems; keep intent short and clear.\n"
+ "- Preserve critical references (URLs, filenames) in intent.\n"
+ "- Normalize to the primary detected language if mixed-language.\n\n"
+ "Return ONLY JSON (no markdown) with this shape:\n"
"{\n"
- " \"detectedLanguage\": \"en\",\n"
+ " \"detectedLanguage\": \"de|en|fr|it|...\",\n"
+ " \"normalizedRequest\": \"Full explicit instruction in detected language\",\n"
" \"intent\": \"Concise normalized request...\",\n"
" \"contextItems\": [\n"
" {\n"
@@ -249,6 +249,7 @@ class WorkflowManager:
aiResponse = await self.services.ai.callAi(prompt=analyzerPrompt)
detectedLanguage = None
+ normalizedRequest = None
intentText = userInput.prompt
contextItems = []
@@ -260,6 +261,7 @@ class WorkflowManager:
if jsonStart != -1 and jsonEnd > jsonStart:
parsed = json.loads(aiResponse[jsonStart:jsonEnd])
detectedLanguage = parsed.get('detectedLanguage') or None
+ normalizedRequest = parsed.get('normalizedRequest') or None
if parsed.get('intent'):
intentText = parsed.get('intent')
contextItems = parsed.get('contextItems') or []
@@ -269,7 +271,18 @@ class WorkflowManager:
# Update services state
if detectedLanguage and isinstance(detectedLanguage, str):
self._setUserLanguage(detectedLanguage)
+ try:
+ setattr(self.services, 'currentUserLanguage', detectedLanguage)
+ except Exception:
+ pass
self.services.currentUserPrompt = intentText or userInput.prompt
+ try:
+ if normalizedRequest:
+ setattr(self.services, 'currentUserPromptNormalized', normalizedRequest)
+ if contextItems is not None:
+ setattr(self.services, 'currentUserContextItems', contextItems)
+ except Exception:
+ pass
# Telemetry (sizes and counts)
try: