gateway/modules/workflows/processing/adaptive/contentValidator.py

918 lines
52 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
# contentValidator.py
# Content validation for adaptive Dynamic mode
# Generic, document-aware validation system
import logging
import json
import base64
import re
from typing import List, Dict, Any, Optional
logger = logging.getLogger(__name__)
# Configuration constants
MAX_CONTENT_SIZE_FOR_FULL_PREVIEW = 50 * 1024 # 50KB threshold
PREVIEW_SAMPLE_SIZE = 1024 # 1KB preview for large documents
class ContentValidator:
"""Validates delivered content against user intent - generic and document-aware"""
def __init__(self, services=None, learningEngine=None):
self.services = services
self.learningEngine = learningEngine
async def validateContent(self, documents: List[Any], intent: Dict[str, Any], taskStep: Optional[Any] = None, actionName: Optional[str] = None, actionParameters: Optional[Dict[str, Any]] = None, actionHistory: Optional[List[Dict[str, Any]]] = None, context: Optional[Any] = None) -> Dict[str, Any]:
"""Validates delivered content against user intent using AI (single attempt; parse-or-fail)
Args:
documents: List of documents to validate
intent: Workflow-level intent dict (for format requirements)
taskStep: Optional TaskStep object (preferred source for objective)
actionName: Optional action name (e.g., "ai.process", "ai.webResearch") that created the documents
actionParameters: Optional action parameters used during execution (e.g., {"columnsPerRow": 10, "researchDepth": "deep"})
actionHistory: Optional list of previously executed actions in the workflow (for multi-step workflow context)
context: Optional context object to access all documents delivered in the current round
"""
return await self._validateWithAI(documents, intent, taskStep, actionName, actionParameters, actionHistory, context)
def _summarizeJsonStructure(self, jsonData: Any) -> Dict[str, Any]:
"""Summarize JSON document structure for validation - extracts main objects, statistics, captions, and IDs."""
try:
if not isinstance(jsonData, dict):
return {"type": "non-dict", "preview": str(jsonData)[:200]}
summary = {
"metadata": {},
"sections": [],
"statistics": {}
}
# Extract metadata - include ALL metadata fields (generic for all action types)
metadata = jsonData.get("metadata", {})
if metadata and isinstance(metadata, dict):
# Include all metadata fields, not just specific ones
summary["metadata"] = dict(metadata)
# Extract documents array (if present)
documents = jsonData.get("documents", [])
if documents:
summary["statistics"]["documentCount"] = len(documents)
# Process first document (most common case)
if len(documents) > 0:
doc = documents[0]
docSections = doc.get("sections", [])
summary["statistics"]["sectionCount"] = len(docSections)
# Summarize sections
for section in docSections:
sectionSummary = {
"id": section.get("id"),
"content_type": section.get("content_type"),
"title": section.get("title"),
"order": section.get("order")
}
# Get elements for processing
elements = section.get("elements", [])
# For tables: extract caption and statistics
if section.get("content_type") == "table":
if elements and isinstance(elements, list) and len(elements) > 0:
tableElement = elements[0]
content = tableElement.get("content", {})
if isinstance(content, dict):
headers = content.get("headers", [])
rows = content.get("rows", [])
else:
headers = tableElement.get("headers", [])
rows = tableElement.get("rows", [])
if headers:
sectionSummary["columnCount"] = len(headers)
sectionSummary["headers"] = headers # Include headers for context
if rows:
sectionSummary["rowCount"] = len(rows)
sectionSummary["caption"] = tableElement.get("caption") or (content.get("caption") if isinstance(content, dict) else None)
# For lists and bullet_lists: extract item count
elif section.get("content_type") in ["list", "bullet_list"]:
if elements and isinstance(elements, list) and len(elements) > 0:
listElement = elements[0]
content = listElement.get("content", {})
if isinstance(content, dict):
items = content.get("items", [])
else:
items = listElement.get("items", [])
if items:
sectionSummary["itemCount"] = len(items)
# For paragraphs/headings: extract text statistics (no preview for security)
elif section.get("content_type") in ["paragraph", "heading"]:
if elements and isinstance(elements, list) and len(elements) > 0:
textElement = elements[0]
content = textElement.get("content", {})
if isinstance(content, dict):
text = content.get("text", "")
else:
text = textElement.get("text", "")
if text:
sectionSummary["textLength"] = len(text)
sectionSummary["wordCount"] = len(text.split())
# Also check for text length if available directly in section
if section.get("textLength"):
sectionSummary["textLength"] = section.get("textLength")
# For code blocks: extract code statistics (no preview for security)
elif section.get("content_type") == "code_block":
if elements and isinstance(elements, list) and len(elements) > 0:
codeElement = elements[0]
content = codeElement.get("content", {})
if isinstance(content, dict):
code = content.get("code", "")
language = content.get("language", "")
if code:
sectionSummary["codeLength"] = len(code)
sectionSummary["codeLineCount"] = code.count('\n') + 1
if language:
sectionSummary["language"] = language
# Wenn contentPartIds vorhanden sind, aber keine elements: Füge ContentParts-Metadaten hinzu
contentPartIds = section.get("contentPartIds", [])
if contentPartIds and not elements:
# Prüfe ob contentPartsMetadata vorhanden ist
contentPartsMetadata = section.get("contentPartsMetadata", [])
if contentPartsMetadata:
sectionSummary["contentPartsMetadata"] = contentPartsMetadata
else:
# Fallback: Zeige nur IDs wenn Metadaten nicht verfügbar
sectionSummary["contentPartIds"] = contentPartIds
sectionSummary["note"] = "ContentParts referenced but metadata not available"
# Include any additional fields from section (generic approach)
# This ensures all action-specific fields are preserved
# BUT exclude type-specific KPIs that don't belong to this content_type
contentType = section.get("content_type", "")
# Define KPIs that are ONLY valid for specific types
typeExclusiveKpis = {
"table": ["columnCount", "rowCount", "headers"], # Only for tables
"bullet_list": ["itemCount"], # Only for bullet_list
"list": ["itemCount"] # Only for list
}
excludedKpis = []
for kpiType, kpiFields in typeExclusiveKpis.items():
if kpiType != contentType:
excludedKpis.extend(kpiFields)
for key, value in section.items():
if key not in sectionSummary and key not in ["elements"] and key not in excludedKpis:
# Don't copy type-specific KPIs if they're 0/empty and we didn't extract them ourselves
# This prevents copying columnCount: 0, rowCount: 0, headers: [] from structure generation phase
if key in ["columnCount", "rowCount", "headers", "itemCount"]:
# Skip if it's 0/empty - we'll only include KPIs we extracted from elements
if isinstance(value, int) and value == 0:
continue
if isinstance(value, list) and len(value) == 0:
continue
# Include simple types (str, int, float, bool, list of primitives)
if isinstance(value, (str, int, float, bool)) or (isinstance(value, list) and len(value) <= 10):
sectionSummary[key] = value
summary["sections"].append(sectionSummary)
else:
# Fallback: check for sections directly in root
sections = jsonData.get("sections", [])
if sections:
summary["statistics"]["sectionCount"] = len(sections)
for section in sections:
sectionSummary = {
"id": section.get("id"),
"content_type": section.get("content_type"),
"title": section.get("title"),
"order": section.get("order")
}
# Get elements for processing
elements = section.get("elements", [])
if section.get("content_type") == "table":
if elements and isinstance(elements, list) and len(elements) > 0:
tableElement = elements[0]
content = tableElement.get("content", {})
if isinstance(content, dict):
headers = content.get("headers", [])
rows = content.get("rows", [])
else:
headers = tableElement.get("headers", [])
rows = tableElement.get("rows", [])
if headers:
sectionSummary["columnCount"] = len(headers)
sectionSummary["headers"] = headers
if rows:
sectionSummary["rowCount"] = len(rows)
sectionSummary["caption"] = tableElement.get("caption") or (content.get("caption") if isinstance(content, dict) else None)
# For lists and bullet_lists: extract item count
elif section.get("content_type") in ["list", "bullet_list"]:
if elements and isinstance(elements, list) and len(elements) > 0:
listElement = elements[0]
content = listElement.get("content", {})
if isinstance(content, dict):
items = content.get("items", [])
else:
items = listElement.get("items", [])
if items:
sectionSummary["itemCount"] = len(items)
# For paragraphs/headings: extract text statistics (no preview for security)
elif section.get("content_type") in ["paragraph", "heading"]:
if elements and isinstance(elements, list) and len(elements) > 0:
textElement = elements[0]
content = textElement.get("content", {})
if isinstance(content, dict):
text = content.get("text", "")
else:
text = textElement.get("text", "")
if text:
sectionSummary["textLength"] = len(text)
sectionSummary["wordCount"] = len(text.split())
if section.get("textLength"):
sectionSummary["textLength"] = section.get("textLength")
# For code blocks: extract code statistics (no preview for security)
elif section.get("content_type") == "code_block":
if elements and isinstance(elements, list) and len(elements) > 0:
codeElement = elements[0]
content = codeElement.get("content", {})
if isinstance(content, dict):
code = content.get("code", "")
language = content.get("language", "")
if code:
sectionSummary["codeLength"] = len(code)
sectionSummary["codeLineCount"] = code.count('\n') + 1
if language:
sectionSummary["language"] = language
# Wenn contentPartIds vorhanden sind, aber keine elements: Füge ContentParts-Metadaten hinzu
contentPartIds = section.get("contentPartIds", [])
if contentPartIds and not elements:
# Prüfe ob contentPartsMetadata vorhanden ist
contentPartsMetadata = section.get("contentPartsMetadata", [])
if contentPartsMetadata:
sectionSummary["contentPartsMetadata"] = contentPartsMetadata
else:
# Fallback: Zeige nur IDs wenn Metadaten nicht verfügbar
sectionSummary["contentPartIds"] = contentPartIds
sectionSummary["note"] = "ContentParts referenced but metadata not available"
# Include any additional fields from section (generic approach)
# BUT exclude type-specific KPIs that don't belong to this content_type
contentType = section.get("content_type", "")
# Define KPIs that are ONLY valid for specific types
typeExclusiveKpis = {
"table": ["columnCount", "rowCount", "headers"], # Only for tables
"bullet_list": ["itemCount"], # Only for bullet_list
"list": ["itemCount"] # Only for list
}
excludedKpis = []
for kpiType, kpiFields in typeExclusiveKpis.items():
if kpiType != contentType:
excludedKpis.extend(kpiFields)
for key, value in section.items():
if key not in sectionSummary and key not in ["elements"] and key not in excludedKpis:
# Don't copy type-specific KPIs if they're 0/empty and we didn't extract them ourselves
# This prevents copying columnCount: 0, rowCount: 0, headers: [] from structure generation phase
if key in ["columnCount", "rowCount", "headers", "itemCount"]:
# Skip if it's 0/empty - we'll only include KPIs we extracted from elements
if isinstance(value, int) and value == 0:
continue
if isinstance(value, list) and len(value) == 0:
continue
# Include simple types (str, int, float, bool, list of primitives)
if isinstance(value, (str, int, float, bool)) or (isinstance(value, list) and len(value) <= 10):
sectionSummary[key] = value
summary["sections"].append(sectionSummary)
# Extract statistics from root level (generic - include all statistics fields)
rootStatistics = jsonData.get("statistics", {})
if rootStatistics and isinstance(rootStatistics, dict):
# Merge root statistics into summary statistics
summary["statistics"].update(rootStatistics)
return summary
except Exception as e:
logger.warning(f"Error summarizing JSON structure: {str(e)}")
return {"error": str(e), "type": "error"}
def _analyzeDocuments(self, documents: List[Any]) -> List[Dict[str, Any]]:
"""
Analyze documents for validation - includes metadata AND JSON structure summary.
JSON summary provides structure information (sections, tables with captions, IDs) without full content.
"""
if not documents:
return []
summaries = []
for doc in documents:
try:
name = getattr(doc, 'documentName', 'Unknown')
mimeType = getattr(doc, 'mimeType', 'unknown')
formatExt = self._detectFormat(doc)
sizeInfo = self._calculateSize(doc)
summary = {
"name": name,
"mimeType": mimeType,
"format": formatExt,
"size": sizeInfo["readable"]
}
# Extract JSON structure summary - prioritize sourceJson for rendered documents
sourceJson = getattr(doc, 'sourceJson', None)
data = getattr(doc, 'documentData', None)
# WICHTIG: For rendered documents (HTML, PDF, DOCX, etc.), jsonStructure is METADATA about the structure,
# NOT the actual rendered content. The actual content is in documentData.
# Include both: jsonStructure for structure metadata, and contentPreview for actual content check
if sourceJson and isinstance(sourceJson, dict):
# Use source JSON for structure analysis (for rendered documents like xlsx/docx/pdf)
jsonSummary = self._summarizeJsonStructure(sourceJson)
summary["jsonStructure"] = jsonSummary
# Add note that this is metadata, not actual content
summary["note"] = "jsonStructure contains metadata about document structure. Actual rendered content is in documentData."
# For rendered documents, also check actual content
if data is not None:
contentPreview = self._getContentPreview(data, formatExt, mimeType)
if contentPreview:
summary["contentPreview"] = contentPreview
elif data is not None:
# Fallback: try to parse documentData as JSON (for non-rendered documents)
if isinstance(data, dict):
# Summarize JSON structure
jsonSummary = self._summarizeJsonStructure(data)
summary["jsonStructure"] = jsonSummary
elif isinstance(data, list) and len(data) > 0 and isinstance(data[0], dict):
# Handle list of documents
jsonSummary = self._summarizeJsonStructure(data[0])
summary["jsonStructure"] = jsonSummary
else:
# For non-JSON data (e.g., rendered HTML), get content preview
contentPreview = self._getContentPreview(data, formatExt, mimeType)
if contentPreview:
summary["contentPreview"] = contentPreview
summaries.append(summary)
except Exception as e:
logger.warning(f"Error analyzing document {getattr(doc, 'documentName', 'Unknown')}: {str(e)}")
summaries.append({
"name": getattr(doc, 'documentName', 'Unknown'),
"mimeType": getattr(doc, 'mimeType', 'unknown'),
"format": "unknown",
"size": "0 B",
"error": str(e)
})
return summaries
def _detectFormat(self, doc: Any) -> str:
"""Extract format from filename extension (always use extension)"""
try:
docName = getattr(doc, 'documentName', '')
# Extract from filename extension
if docName and '.' in docName:
ext = docName.rsplit('.', 1)[1].lower()
return ext
return 'unknown'
except Exception as e:
logger.warning(f"Error detecting format: {str(e)}")
return 'unknown'
def _calculateSize(self, doc: Any) -> Dict[str, Any]:
"""Calculate document size in bytes and human-readable format"""
try:
if not hasattr(doc, 'documentData') or doc.documentData is None:
return {"bytes": 0, "readable": "0 B"}
data = doc.documentData
size_bytes = 0
if isinstance(data, str):
size_bytes = len(data.encode('utf-8'))
elif isinstance(data, bytes):
size_bytes = len(data)
elif isinstance(data, (dict, list)):
# Estimate JSON size
try:
json_str = json.dumps(data)
size_bytes = len(json_str.encode('utf-8'))
except:
size_bytes = len(str(data).encode('utf-8'))
else:
size_bytes = len(str(data).encode('utf-8'))
# Convert to human-readable format
readable = self._formatBytes(size_bytes)
return {"bytes": size_bytes, "readable": readable}
except Exception as e:
logger.warning(f"Error calculating size: {str(e)}")
return {"bytes": 0, "readable": "0 B"}
def _formatBytes(self, bytes: int) -> str:
"""Format bytes to human-readable string"""
for unit in ['B', 'KB', 'MB', 'GB']:
if bytes < 1024.0:
return f"{bytes:.1f} {unit}"
bytes /= 1024.0
return f"{bytes:.1f} TB"
def _getContentPreview(self, data: Any, formatExt: str, mimeType: str) -> Optional[Dict[str, Any]]:
"""Get structural validation info for rendered documents (generic, NO content preview for security/privacy)
Returns metadata about document structure to help validation distinguish between:
- Structure metadata (jsonStructure) - describes what should be rendered
- Actual rendered content (documentData) - the actual document file
Does NOT expose actual content, only structural indicators.
"""
try:
if data is None:
return None
preview = {}
# Generic content type detection
if isinstance(data, bytes):
preview["dataType"] = "bytes"
preview["contentLength"] = len(data)
# Check if it's likely text-based (for text formats like HTML, TXT, etc.)
try:
# Try to decode as UTF-8 to check if it's text-based
decoded = data.decode('utf-8', errors='strict')
preview["isTextBased"] = True
preview["contentLength"] = len(decoded)
# For text-based formats, check if it looks like rendered content vs JSON metadata
# JSON metadata typically starts with { or [ and contains structure keywords
trimmed = decoded.strip()
looksLikeJson = (trimmed.startswith('{') or trimmed.startswith('[')) and \
('"sections"' in trimmed or '"contentPartIds"' in trimmed or '"generationHint"' in trimmed)
preview["looksLikeRenderedContent"] = not looksLikeJson
except UnicodeDecodeError:
# Not valid UTF-8, likely binary (PDF, DOCX, images, etc.)
preview["isTextBased"] = False
preview["isBinary"] = True
# Binary files with content are rendered (not metadata)
preview["looksLikeRenderedContent"] = True
elif isinstance(data, str):
preview["dataType"] = "string"
preview["isTextBased"] = True
preview["contentLength"] = len(data)
# Check if it looks like rendered content vs JSON metadata
trimmed = data.strip()
looksLikeJson = (trimmed.startswith('{') or trimmed.startswith('[')) and \
('"sections"' in trimmed or '"contentPartIds"' in trimmed or '"generationHint"' in trimmed)
preview["looksLikeRenderedContent"] = not looksLikeJson
elif isinstance(data, (dict, list)):
# If documentData is still a dict/list, it's likely structure metadata, not rendered content
preview["dataType"] = "json"
preview["isTextBased"] = True
preview["looksLikeRenderedContent"] = False
preview["note"] = "documentData is JSON structure, not rendered document file"
else:
preview["dataType"] = type(data).__name__
preview["contentLength"] = len(str(data)) if hasattr(data, '__len__') else 0
return preview if preview else None
except Exception as e:
logger.warning(f"Error getting content structure info: {str(e)}")
return None
def _isFormatCompatible(self, deliveredFormat: str, expectedFormat: str) -> bool:
"""
Generic format compatibility check.
- txt/md/html are text formats (compatible with each other)
- pdf/docx/xlsx are document formats (not compatible with each other)
- json/xml are structured formats
- images are image formats
"""
deliveredLower = deliveredFormat.lower()
expectedLower = expectedFormat.lower()
# Exact match
if deliveredLower == expectedLower:
return True
# Text formats are interchangeable
textFormats = ['txt', 'md', 'html', 'text', 'plain']
if deliveredLower in textFormats and expectedLower in textFormats:
return True
# Structured formats
if deliveredLower in ['json', 'xml'] and expectedLower in ['json', 'xml']:
return True
# Document formats are NOT compatible with each other
documentFormats = ['pdf', 'docx', 'xlsx', 'pptx']
if deliveredLower in documentFormats and expectedLower in documentFormats:
return False # pdf ≠ docx
return False
async def _validateWithAI(self, documents: List[Any], intent: Dict[str, Any], taskStep: Optional[Any] = None, actionName: Optional[str] = None, actionParameters: Optional[Dict[str, Any]] = None, actionHistory: Optional[List[Dict[str, Any]]] = None, context: Optional[Any] = None) -> Dict[str, Any]:
"""AI-based comprehensive validation - generic approach"""
try:
if not hasattr(self, 'services') or not self.services or not hasattr(self.services, 'ai'):
return self._createFailedValidationResult("AI service not available")
# Use taskStep.objective if available, otherwise fall back to workflow intent
taskObjective = None
if taskStep and hasattr(taskStep, 'objective'):
taskObjective = taskStep.objective
elif taskStep and isinstance(taskStep, dict):
taskObjective = taskStep.get('objective')
# Use taskStep format fields if available, otherwise fall back to intent
dataType = None
expectedFormats = None
if taskStep:
if hasattr(taskStep, 'dataType') and taskStep.dataType:
dataType = taskStep.dataType
elif isinstance(taskStep, dict):
dataType = taskStep.get('dataType')
if hasattr(taskStep, 'expectedFormats') and taskStep.expectedFormats:
expectedFormats = taskStep.expectedFormats
elif isinstance(taskStep, dict):
expectedFormats = taskStep.get('expectedFormats')
# Fallback to intent if taskStep format fields not available
if not dataType:
dataType = intent.get('dataType', 'unknown')
if not expectedFormats:
expectedFormats = intent.get('expectedFormats', [])
# Determine objective text and label
workflowIntent = getattr(self.services.workflow, '_workflowIntent', {}) if hasattr(self.services, 'workflow') and self.services.workflow else {}
intentText = workflowIntent.get('intent', 'Unknown')
objectiveText = taskObjective if taskObjective else intentText
objectiveLabel = "TASK OBJECTIVE" if taskObjective else "USER REQUEST"
# Build prompt base WITHOUT document summaries first
# Use success criteria from taskStep if available, otherwise from intent
successCriteria = []
if taskStep and hasattr(taskStep, 'successCriteria') and taskStep.successCriteria:
successCriteria = taskStep.successCriteria
elif taskStep and isinstance(taskStep, dict):
successCriteria = taskStep.get('successCriteria', [])
else:
successCriteria = intent.get('successCriteria', [])
criteriaCount = len(successCriteria)
# Build action name context with human-readable description
actionContext = ""
if actionName:
# Convert action name to human-readable format
actionDescription = actionName.replace("ai.", "").replace(".", " ").title()
actionContext = f"\nDOCUMENTS CREATED BY: {actionDescription} ({actionName})"
# Build action parameters context
actionParamsContext = ""
if actionParameters and isinstance(actionParameters, dict) and len(actionParameters) > 0:
# Filter out documentList and other large/redundant parameters for clarity
relevantParams = {k: v for k, v in actionParameters.items()
if k not in ['documentList', 'connections'] and v is not None}
if relevantParams:
paramsJson = json.dumps(relevantParams, ensure_ascii=False, indent=2)
actionParamsContext = f"\nACTION PARAMETERS USED: {paramsJson}"
# Extract validation metadata from documents (action-specific context)
validationMetadataContext = ""
if documents:
metadataList = []
for doc in documents:
metadata = getattr(doc, 'validationMetadata', None)
if metadata and isinstance(metadata, dict):
metadataList.append(metadata)
if metadataList:
# Combine all metadata (usually just one document)
combinedMetadata = {}
for meta in metadataList:
combinedMetadata.update(meta)
if combinedMetadata:
metadataJson = json.dumps(combinedMetadata, ensure_ascii=False, indent=2)
validationMetadataContext = f"\nACTION VALIDATION METADATA: {metadataJson}"
# Build action history context (for multi-step workflow validation)
actionHistoryContext = ""
if actionHistory and isinstance(actionHistory, list) and len(actionHistory) > 0:
historyEntries = []
for entry in actionHistory:
if isinstance(entry, dict):
action = entry.get('action', 'unknown')
params = entry.get('parameters', {}) or {}
step = entry.get('step', 0)
# Filter out documentList for clarity
relevantParams = {k: v for k, v in params.items() if k not in ['documentList', 'connections'] and v is not None}
paramsStr = json.dumps(relevantParams, ensure_ascii=False) if relevantParams else "{}"
historyEntries.append(f"Step {step}: {action} {paramsStr}")
elif isinstance(entry, str):
historyEntries.append(entry)
if historyEntries:
actionHistoryContext = f"\n\n=== ACTION HISTORY ===\n" + "\n".join(f"- {entry}" for entry in historyEntries)
actionHistoryContext += "\n\nIMPORTANT: This shows the complete workflow that produced the documents. For process-oriented criteria (e.g., 'internet search performed'), check ACTION HISTORY first. Document metadata may only reflect the LAST action, not the entire workflow."
# Build document index context (all documents delivered in current round)
documentIndexContext = ""
if context and self.services and hasattr(self.services, 'chat') and hasattr(self.services, 'workflow') and self.services.workflow:
try:
documentIndex = self.services.chat.getAvailableDocuments(self.services.workflow)
if documentIndex and documentIndex.strip() and documentIndex != "No documents available":
# Extract only "Current round documents" section if present
lines = documentIndex.split('\n')
currentRoundSection = []
inCurrentRound = False
for line in lines:
if "Current round documents:" in line:
inCurrentRound = True
currentRoundSection.append(line)
elif inCurrentRound:
if line.strip().startswith("- docList:") or line.strip().startswith(" - docItem:") or line.strip().startswith("- docItem:"):
currentRoundSection.append(line)
elif line.strip() == "":
# Empty line is okay, continue
continue
elif "Past rounds documents:" in line or "AVAILABLE_CONNECTIONS_INDEX:" in line:
# End of current round section
break
else:
# Still in current round section
currentRoundSection.append(line)
if currentRoundSection:
documentIndexContext = "\n\n=== ALL DOCUMENTS DELIVERED IN CURRENT ROUND ===\n" + "\n".join(currentRoundSection)
documentIndexContext += "\n\nIMPORTANT: This shows ALL documents that have been delivered in the current round, not just the ones being validated in this step. Use this to check if all required formats/documents are present across the entire round."
except Exception as e:
logger.warning(f"Error extracting document index for validation: {str(e)}")
# Continue without document index - not critical
# Transform criteria that require data access into metadata-only checks
transformedCriteria = self._transformCriteriaForMetadataOnly(successCriteria)
# Format success criteria for display with index numbers
if transformedCriteria:
criteriaDisplay = "\n".join([f"[{i}] {criterion}" for i, criterion in enumerate(transformedCriteria)])
else:
criteriaDisplay = "[]"
promptBase = f"""TASK VALIDATION
=== TASK INFORMATION ===
{objectiveLabel}: '{objectiveText}'
EXPECTED DATA TYPE: {dataType}
EXPECTED FORMATS: {expectedFormats if expectedFormats else ['any']}{actionContext}{actionParamsContext}{validationMetadataContext}{actionHistoryContext}{documentIndexContext}
=== VALIDATION INSTRUCTIONS ===
CRITICAL: Validate ONLY metadata/structure. Documents may be binary (PDF, DOCX, images) or very large (200MB+). NEVER try to read or validate actual content values.
VALIDATION RULES:
1. METADATA ONLY: Use jsonStructure (sections, contentPartIds, content_type, statistics) and contentPreview (dataType, contentLength, looksLikeRenderedContent) for validation. These are METADATA indicators, NOT actual content.
2. FORMAT VALIDATION: Check mimeType/format metadata only. Do NOT inspect content to determine format. Format mismatch = wrong_format gap.
3. CONTENT EXISTENCE: Use contentPreview.looksLikeRenderedContent=true to confirm content exists. Use jsonStructure.content_type to confirm data types exist (e.g., "image" section = image exists, "bullet_list" section = bullet list exists, "table" section = table exists). If a section with a content_type exists, the content has been delivered. Do NOT assume content was AI-generated vs extracted - if the section exists, it was delivered.
4. STRUCTURE VALIDATION: Use jsonStructure.sections, statistics (counts, rowCount, columnCount, itemCount) as evidence. Trust structure metadata over format claims. Only check KPIs if they are present (missing KPIs mean elements not yet populated, not that content is missing).
5. PROCESS VALIDATION: Use ACTION HISTORY for process-oriented criteria (e.g., "search performed", "extraction done").
6. ONE CRITERION PER EVALUATION: Evaluate each criterion independently. Do not mention other criteria.
7. NO ASSUMPTIONS: Do NOT assume content was AI-generated vs extracted. If a section exists with content_type, the content was delivered. Only validate what is present in the metadata.
8. DATA-LEVEL CRITERIA TRANSFORMATION: Criteria mentioning accuracy percentages (e.g., "95% accuracy"), completeness percentages (e.g., "98% completeness"), or "all X extracted" have been transformed to metadata-only checks. For accuracy/completeness: Check if contentPartIds reference all source documents and if structure metadata shows expected data types (tables, lists, etc.) exist. For "all X extracted": Check if contentPartIds reference all source documents mentioned in ACTION HISTORY or document index. NEVER attempt to verify accuracy/completeness by comparing actual data values - only use metadata indicators.
VALIDATION STEPS:
- Check ACTION HISTORY for process-oriented criteria
- Check jsonStructure metadata (sections, content_type, statistics) for structure validation
- Check contentPreview.looksLikeRenderedContent for content existence (not quality)
- Check mimeType/format for format validation
- NEVER try to read actual content values (binary files, large files, data accuracy)
- Classify gaps: missing_data, incomplete_data, wrong_structure, wrong_format
SCORING:
- Data complete + structure matches → qualityScore: 0.9-1.0
- Data complete but format issues → qualityScore: 0.7-0.9
- Missing/incomplete data → qualityScore: <0.7
- Format mismatch only (data present) → qualityScore: 0.6-0.7
SUGGESTIONS:
- ONE suggestion per UNMET criterion, ordered by criteriaMapping index
- Reference actual structure values found and required values
- Calculate quantitative gaps when numbers are available
- Be specific and actionable based on structure evidence
=== OUTPUT FORMAT ===
{{
"overallSuccess": false,
"qualityScore": 0.0,
"dataTypeMatch": false,
"formatMatch": false,
"documentCount": {len(documents)},
"criteriaMapping": [
{{
"index": 0,
"criterion": "exact_criterion_text",
"met": false,
"reason": "explanation_for_this_criterion_only"
}}
],
"gapAnalysis": "Brief gap summary",
"gapType": "missing_data" | "wrong_structure" | "wrong_format" | "incomplete_data" | "no_gap",
"structureComparison": {{
"required": {{}},
"found": {{}},
"gap": {{}}
}},
"improvementSuggestions": ["One suggestion per unmet criterion"],
"validationDetails": [
{{
"documentName": "name.ext",
"issues": ["Specific issue"],
"suggestions": ["Specific fix"]
}}
]
}}
=== DATA ===
SUCCESS CRITERIA TO VALIDATE in criteriaMapping array:
{criteriaDisplay}
DELIVERED DOCUMENTS ({len(documents)} items):
"""
# Analyze documents
documentSummaries = self._analyzeDocuments(documents)
# Build final prompt with summaries at the end
documentsJson = json.dumps(documentSummaries, indent=2, ensure_ascii=False)
validationPrompt = promptBase + documentsJson
# Call AI service for validation
response = await self.services.ai.callAiPlanning(
prompt=validationPrompt,
placeholders=None,
debugType="contentvalidation"
)
if not response or not response.strip():
logger.warning("AI validation returned empty response")
raise ValueError("AI validation failed - empty response")
# Clean and extract JSON from response using proper JSON extraction utility
# This handles nested structures and markdown code blocks correctly
result = response.strip()
logger.debug(f"AI validation response length: {len(result)}")
# Extract JSON first
extractedJson = self.services.utils.jsonExtractString(result)
if not extractedJson:
logger.debug(f"No JSON found in AI response: {result[:200]}...")
logger.debug(f"Full AI response: {result}")
raise ValueError("AI validation failed - no JSON in response")
# Proactively fix Python-style booleans (False/True -> false/true) BEFORE parsing
# This handles booleans in any context: standalone, in lists, in dicts, etc.
# Use word boundaries but also handle cases where booleans are in brackets/arrays
# Replace False/True regardless of context (word boundary handles string matching correctly)
normalizedJson = re.sub(r'\bFalse\b', 'false', extractedJson)
normalizedJson = re.sub(r'\bTrue\b', 'true', normalizedJson)
logger.debug(f"Extracted JSON (before normalization): {extractedJson[:200]}...")
logger.debug(f"Normalized JSON (after boolean fix): {normalizedJson[:200]}...")
# Now try to parse the normalized JSON
try:
aiResult = json.loads(normalizedJson)
logger.info("AI validation JSON parsed successfully")
except json.JSONDecodeError as json_error:
logger.warning(f"AI validation invalid JSON after normalization: {str(json_error)}")
logger.debug(f"JSON content that failed: {normalizedJson[:500]}...")
raise ValueError(f"AI validation failed - invalid JSON: {str(json_error)}")
overall = aiResult.get("overallSuccess")
quality = aiResult.get("qualityScore")
details = aiResult.get("validationDetails")
gap = aiResult.get("gapAnalysis", "")
improvements = aiResult.get("improvementSuggestions", [])
gap_type = aiResult.get("gapType", "")
structure_comp = aiResult.get("structureComparison", {})
criteria_mapping = aiResult.get("criteriaMapping", [])
# Normalize while keeping failures explicit
normalized = {
"overallSuccess": overall if isinstance(overall, bool) else None,
"qualityScore": float(quality) if isinstance(quality, (int, float)) else None,
"documentCount": len(documentSummaries),
"gapAnalysis": gap if gap else "",
"gapType": gap_type if gap_type else "",
"structureComparison": structure_comp if structure_comp else {},
"criteriaMapping": criteria_mapping if isinstance(criteria_mapping, list) else [],
"validationDetails": details if isinstance(details, list) else [{
"documentName": "AI Validation",
"gapAnalysis": gap
}],
"improvementSuggestions": improvements,
"schemaCompliant": True,
"originalType": "json",
"missingFields": []
}
if normalized["overallSuccess"] is None:
normalized["missingFields"].append("overallSuccess")
if normalized["qualityScore"] is None:
normalized["missingFields"].append("qualityScore")
if normalized["missingFields"]:
normalized["schemaCompliant"] = False
return normalized
except Exception as e:
logger.error(f"AI validation failed: {str(e)}")
raise
def _transformCriteriaForMetadataOnly(self, criteria: List[str]) -> List[str]:
"""
Transform criteria that require data access into metadata-only checks.
Preserves original criterion intent while converting data-level checks to metadata checks.
Examples:
- "95% accuracy""[METADATA ONLY] Data structure indicates extraction completed (check contentPartIds reference all source documents)"
- "98% completeness""[METADATA ONLY] All source documents referenced in contentPartIds (verify source count matches)"
- "all transactions extracted""[METADATA ONLY] All source documents referenced in contentPartIds (verify source count matches)"
"""
if not criteria:
return []
transformed = []
for criterion in criteria:
original = criterion.strip()
transformed_criterion = original
# Pattern: accuracy percentage (e.g., "95% accuracy", "accuracy meets or exceeds 95% threshold")
if re.search(r'\d+%?\s*accuracy|accuracy.*\d+%', original, re.IGNORECASE):
# Extract the main subject (e.g., "transactions", "data", etc.)
subject_match = re.search(r'(transactions?|data|items?|records?|entries?)', original, re.IGNORECASE)
subject = subject_match.group(1).lower() if subject_match else "data"
transformed_criterion = f"[METADATA ONLY] {original}: Check that contentPartIds reference all source documents and jsonStructure shows expected {subject} structure exists (tables/lists with rowCount/itemCount > 0). Cannot verify actual {subject} accuracy values from metadata."
# Pattern: completeness percentage or "all X extracted" (e.g., "98% completeness", "all transactions extracted")
elif re.search(r'\d+%?\s*completeness|completeness.*\d+%|all\s+.*extracted|extract.*all', original, re.IGNORECASE):
# Extract the main subject
subject_match = re.search(r'(transactions?|data|items?|records?|entries?|statements?|documents?)', original, re.IGNORECASE)
subject = subject_match.group(1).lower() if subject_match else "items"
transformed_criterion = f"[METADATA ONLY] {original}: Verify that contentPartIds reference all source documents mentioned in ACTION HISTORY/document index, and jsonStructure shows {subject} structure exists (check rowCount/itemCount in tables/lists). Cannot verify actual {subject} count from metadata."
# Pattern: "no missing data" or "no incorrect data"
elif re.search(r'no\s+missing|no\s+incorrect|no\s+errors?', original, re.IGNORECASE):
transformed_criterion = f"[METADATA ONLY] {original}: Check that jsonStructure.content_type shows expected data types present (tables, lists, etc.) and contentPreview.looksLikeRenderedContent=true. Cannot verify actual data values from metadata."
# Pattern: data accuracy without percentage (e.g., "data is accurate", "accurate data")
elif re.search(r'data.*accurate|accurate.*data', original, re.IGNORECASE) and '%' not in original:
transformed_criterion = f"[METADATA ONLY] {original}: Check that contentPartIds reference source documents and jsonStructure shows expected data structure exists. Cannot verify actual data accuracy values from metadata."
transformed.append(transformed_criterion)
return transformed
def _createFailedValidationResult(self, errorMessage: str) -> Dict[str, Any]:
"""Create a standardized failed validation result"""
return {
"overallSuccess": False,
"qualityScore": 0.0,
"dataTypeMatch": False,
"formatMatch": False,
"documentCount": 0,
"criteriaMapping": [],
"gapAnalysis": errorMessage,
"improvementSuggestions": [],
"validationDetails": [],
"schemaCompliant": True,
"originalType": "error",
"missingFields": [],
"error": errorMessage
}