1048 lines
60 KiB
Python
1048 lines
60 KiB
Python
# Copyright (c) 2025 Patrick Motsch
|
|
# All rights reserved.
|
|
# contentValidator.py
|
|
# Content validation for adaptive Dynamic mode
|
|
# Generic, document-aware validation system
|
|
|
|
import logging
|
|
import json
|
|
import base64
|
|
import re
|
|
from typing import List, Dict, Any, Optional
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Configuration constants
|
|
MAX_CONTENT_SIZE_FOR_FULL_PREVIEW = 50 * 1024 # 50KB threshold
|
|
PREVIEW_SAMPLE_SIZE = 1024 # 1KB preview for large documents
|
|
|
|
|
|
class ContentValidator:
|
|
"""Validates delivered content against user intent - generic and document-aware"""
|
|
|
|
def __init__(self, services=None, learningEngine=None):
|
|
self.services = services
|
|
self.learningEngine = learningEngine
|
|
|
|
async def validateContent(self, documents: List[Any], intent: Dict[str, Any], taskStep: Optional[Any] = None, actionName: Optional[str] = None, actionParameters: Optional[Dict[str, Any]] = None, actionHistory: Optional[List[Dict[str, Any]]] = None, context: Optional[Any] = None) -> Dict[str, Any]:
|
|
"""Validates delivered content against user intent using AI (single attempt; parse-or-fail)
|
|
|
|
Args:
|
|
documents: List of documents to validate
|
|
intent: Workflow-level intent dict (for format requirements)
|
|
taskStep: Optional TaskStep object (preferred source for objective)
|
|
actionName: Optional action name (e.g., "ai.process", "ai.webResearch") that created the documents
|
|
actionParameters: Optional action parameters used during execution (e.g., {"columnsPerRow": 10, "researchDepth": "deep"})
|
|
actionHistory: Optional list of previously executed actions in the workflow (for multi-step workflow context)
|
|
context: Optional context object to access all documents delivered in the current round
|
|
"""
|
|
return await self._validateWithAI(documents, intent, taskStep, actionName, actionParameters, actionHistory, context)
|
|
|
|
def _summarizeJsonStructure(self, jsonData: Any) -> Dict[str, Any]:
|
|
"""Summarize JSON document structure for validation - extracts main objects, statistics, captions, and IDs."""
|
|
try:
|
|
if not isinstance(jsonData, dict):
|
|
return {"type": "non-dict", "preview": str(jsonData)[:200]}
|
|
|
|
summary = {
|
|
"metadata": {},
|
|
"sections": [],
|
|
"statistics": {}
|
|
}
|
|
|
|
# Extract metadata - include ALL metadata fields (generic for all action types)
|
|
metadata = jsonData.get("metadata", {})
|
|
if metadata and isinstance(metadata, dict):
|
|
# Include all metadata fields, not just specific ones
|
|
summary["metadata"] = dict(metadata)
|
|
|
|
# Extract documents array (if present)
|
|
documents = jsonData.get("documents", [])
|
|
if documents:
|
|
summary["statistics"]["documentCount"] = len(documents)
|
|
# Process first document (most common case)
|
|
if len(documents) > 0:
|
|
doc = documents[0]
|
|
docSections = doc.get("sections", [])
|
|
summary["statistics"]["sectionCount"] = len(docSections)
|
|
|
|
# Summarize sections
|
|
for section in docSections:
|
|
sectionSummary = {
|
|
"id": section.get("id"),
|
|
"content_type": section.get("content_type"),
|
|
"title": section.get("title"),
|
|
"order": section.get("order")
|
|
}
|
|
|
|
# Get elements for processing
|
|
elements = section.get("elements", [])
|
|
|
|
# For tables: extract caption and statistics
|
|
if section.get("content_type") == "table":
|
|
# Try to extract from elements first
|
|
if elements and isinstance(elements, list) and len(elements) > 0:
|
|
tableElement = elements[0]
|
|
# Ensure tableElement is a dictionary before accessing
|
|
if isinstance(tableElement, dict):
|
|
content = tableElement.get("content", {})
|
|
if isinstance(content, dict):
|
|
headers = content.get("headers", [])
|
|
rows = content.get("rows", [])
|
|
else:
|
|
headers = tableElement.get("headers", [])
|
|
rows = tableElement.get("rows", [])
|
|
if headers:
|
|
sectionSummary["columnCount"] = len(headers)
|
|
sectionSummary["headers"] = headers # Include headers for context
|
|
if rows:
|
|
sectionSummary["rowCount"] = len(rows)
|
|
sectionSummary["caption"] = tableElement.get("caption") or (content.get("caption") if isinstance(content, dict) else None)
|
|
else:
|
|
# Fallback: extract KPIs from section metadata if elements are missing
|
|
# This handles cases where filledStructure doesn't have elements populated
|
|
if "columnCount" in section:
|
|
sectionSummary["columnCount"] = section.get("columnCount")
|
|
if "rowCount" in section:
|
|
sectionSummary["rowCount"] = section.get("rowCount")
|
|
if "headers" in section:
|
|
sectionSummary["headers"] = section.get("headers")
|
|
if "caption" in section:
|
|
sectionSummary["caption"] = section.get("caption")
|
|
|
|
# For lists and bullet_lists: extract item count
|
|
elif section.get("content_type") in ["list", "bullet_list"]:
|
|
if elements and isinstance(elements, list) and len(elements) > 0:
|
|
listElement = elements[0]
|
|
# Ensure listElement is a dictionary before accessing
|
|
if isinstance(listElement, dict):
|
|
content = listElement.get("content", {})
|
|
if isinstance(content, dict):
|
|
items = content.get("items", [])
|
|
else:
|
|
items = listElement.get("items", [])
|
|
if items:
|
|
sectionSummary["itemCount"] = len(items)
|
|
|
|
# For paragraphs/headings: extract text statistics (no preview for security)
|
|
elif section.get("content_type") in ["paragraph", "heading"]:
|
|
if elements and isinstance(elements, list) and len(elements) > 0:
|
|
textElement = elements[0]
|
|
# Ensure textElement is a dictionary before accessing
|
|
if isinstance(textElement, dict):
|
|
content = textElement.get("content", {})
|
|
if isinstance(content, dict):
|
|
text = content.get("text", "")
|
|
else:
|
|
text = textElement.get("text", "")
|
|
if text:
|
|
sectionSummary["textLength"] = len(text)
|
|
sectionSummary["wordCount"] = len(text.split())
|
|
# Also check for text length if available directly in section
|
|
if section.get("textLength"):
|
|
sectionSummary["textLength"] = section.get("textLength")
|
|
|
|
# For code blocks: extract code statistics (no preview for security)
|
|
elif section.get("content_type") == "code_block":
|
|
if elements and isinstance(elements, list) and len(elements) > 0:
|
|
codeElement = elements[0]
|
|
content = codeElement.get("content", {})
|
|
if isinstance(content, dict):
|
|
code = content.get("code", "")
|
|
language = content.get("language", "")
|
|
if code:
|
|
sectionSummary["codeLength"] = len(code)
|
|
sectionSummary["codeLineCount"] = code.count('\n') + 1
|
|
if language:
|
|
sectionSummary["language"] = language
|
|
|
|
# Wenn contentPartIds vorhanden sind, aber keine elements: Füge ContentParts-Metadaten hinzu
|
|
contentPartIds = section.get("contentPartIds", [])
|
|
if contentPartIds and not elements:
|
|
# Prüfe ob contentPartsMetadata vorhanden ist
|
|
contentPartsMetadata = section.get("contentPartsMetadata", [])
|
|
if contentPartsMetadata:
|
|
sectionSummary["contentPartsMetadata"] = contentPartsMetadata
|
|
else:
|
|
# Fallback: Zeige nur IDs wenn Metadaten nicht verfügbar
|
|
sectionSummary["contentPartIds"] = contentPartIds
|
|
sectionSummary["note"] = "ContentParts referenced but metadata not available"
|
|
|
|
# Include any additional fields from section (generic approach)
|
|
# This ensures all action-specific fields are preserved
|
|
# BUT exclude type-specific KPIs that don't belong to this content_type
|
|
contentType = section.get("content_type", "")
|
|
# Define KPIs that are ONLY valid for specific types
|
|
typeExclusiveKpis = {
|
|
"table": ["columnCount", "rowCount", "headers"], # Only for tables
|
|
"bullet_list": ["itemCount"], # Only for bullet_list
|
|
"list": ["itemCount"] # Only for list
|
|
}
|
|
excludedKpis = []
|
|
for kpiType, kpiFields in typeExclusiveKpis.items():
|
|
if kpiType != contentType:
|
|
excludedKpis.extend(kpiFields)
|
|
|
|
for key, value in section.items():
|
|
if key not in sectionSummary and key not in ["elements"] and key not in excludedKpis:
|
|
# Don't copy type-specific KPIs if they're 0/empty and we didn't extract them ourselves
|
|
# This prevents copying columnCount: 0, rowCount: 0, headers: [] from structure generation phase
|
|
if key in ["columnCount", "rowCount", "headers", "itemCount"]:
|
|
# Skip if it's 0/empty - we'll only include KPIs we extracted from elements
|
|
if isinstance(value, int) and value == 0:
|
|
continue
|
|
if isinstance(value, list) and len(value) == 0:
|
|
continue
|
|
|
|
# Include simple types (str, int, float, bool, list of primitives)
|
|
if isinstance(value, (str, int, float, bool)) or (isinstance(value, list) and len(value) <= 10):
|
|
sectionSummary[key] = value
|
|
|
|
summary["sections"].append(sectionSummary)
|
|
else:
|
|
# Fallback: check for sections directly in root
|
|
sections = jsonData.get("sections", [])
|
|
if sections:
|
|
summary["statistics"]["sectionCount"] = len(sections)
|
|
for section in sections:
|
|
sectionSummary = {
|
|
"id": section.get("id"),
|
|
"content_type": section.get("content_type"),
|
|
"title": section.get("title"),
|
|
"order": section.get("order")
|
|
}
|
|
|
|
# Get elements for processing
|
|
elements = section.get("elements", [])
|
|
|
|
if section.get("content_type") == "table":
|
|
# Try to extract from elements first
|
|
if elements and isinstance(elements, list) and len(elements) > 0:
|
|
tableElement = elements[0]
|
|
# Ensure tableElement is a dictionary before accessing
|
|
if isinstance(tableElement, dict):
|
|
content = tableElement.get("content", {})
|
|
if isinstance(content, dict):
|
|
headers = content.get("headers", [])
|
|
rows = content.get("rows", [])
|
|
else:
|
|
headers = tableElement.get("headers", [])
|
|
rows = tableElement.get("rows", [])
|
|
if headers:
|
|
sectionSummary["columnCount"] = len(headers)
|
|
sectionSummary["headers"] = headers
|
|
if rows:
|
|
sectionSummary["rowCount"] = len(rows)
|
|
sectionSummary["caption"] = tableElement.get("caption") or (content.get("caption") if isinstance(content, dict) else None)
|
|
else:
|
|
# Fallback: extract KPIs from section metadata if elements are missing
|
|
# This handles cases where filledStructure doesn't have elements populated
|
|
if "columnCount" in section:
|
|
sectionSummary["columnCount"] = section.get("columnCount")
|
|
if "rowCount" in section:
|
|
sectionSummary["rowCount"] = section.get("rowCount")
|
|
if "headers" in section:
|
|
sectionSummary["headers"] = section.get("headers")
|
|
if "caption" in section:
|
|
sectionSummary["caption"] = section.get("caption")
|
|
|
|
# For lists and bullet_lists: extract item count
|
|
elif section.get("content_type") in ["list", "bullet_list"]:
|
|
if elements and isinstance(elements, list) and len(elements) > 0:
|
|
listElement = elements[0]
|
|
# Ensure listElement is a dictionary before accessing
|
|
if isinstance(listElement, dict):
|
|
content = listElement.get("content", {})
|
|
if isinstance(content, dict):
|
|
items = content.get("items", [])
|
|
else:
|
|
items = listElement.get("items", [])
|
|
if items:
|
|
sectionSummary["itemCount"] = len(items)
|
|
else:
|
|
# Fallback: extract KPIs from section metadata if elements are missing
|
|
if "itemCount" in section:
|
|
sectionSummary["itemCount"] = section.get("itemCount")
|
|
|
|
# For paragraphs/headings: extract text statistics (no preview for security)
|
|
elif section.get("content_type") in ["paragraph", "heading"]:
|
|
if elements and isinstance(elements, list) and len(elements) > 0:
|
|
textElement = elements[0]
|
|
# Ensure textElement is a dictionary before accessing
|
|
if isinstance(textElement, dict):
|
|
content = textElement.get("content", {})
|
|
if isinstance(content, dict):
|
|
text = content.get("text", "")
|
|
else:
|
|
text = textElement.get("text", "")
|
|
if text:
|
|
sectionSummary["textLength"] = len(text)
|
|
sectionSummary["wordCount"] = len(text.split())
|
|
if section.get("textLength"):
|
|
sectionSummary["textLength"] = section.get("textLength")
|
|
|
|
# For code blocks: extract code statistics (no preview for security)
|
|
elif section.get("content_type") == "code_block":
|
|
if elements and isinstance(elements, list) and len(elements) > 0:
|
|
codeElement = elements[0]
|
|
content = codeElement.get("content", {})
|
|
if isinstance(content, dict):
|
|
code = content.get("code", "")
|
|
language = content.get("language", "")
|
|
if code:
|
|
sectionSummary["codeLength"] = len(code)
|
|
sectionSummary["codeLineCount"] = code.count('\n') + 1
|
|
if language:
|
|
sectionSummary["language"] = language
|
|
|
|
# Wenn contentPartIds vorhanden sind, aber keine elements: Füge ContentParts-Metadaten hinzu
|
|
contentPartIds = section.get("contentPartIds", [])
|
|
if contentPartIds and not elements:
|
|
# Prüfe ob contentPartsMetadata vorhanden ist
|
|
contentPartsMetadata = section.get("contentPartsMetadata", [])
|
|
if contentPartsMetadata:
|
|
sectionSummary["contentPartsMetadata"] = contentPartsMetadata
|
|
else:
|
|
# Fallback: Zeige nur IDs wenn Metadaten nicht verfügbar
|
|
sectionSummary["contentPartIds"] = contentPartIds
|
|
sectionSummary["note"] = "ContentParts referenced but metadata not available"
|
|
|
|
# Include any additional fields from section (generic approach)
|
|
# BUT exclude type-specific KPIs that don't belong to this content_type
|
|
contentType = section.get("content_type", "")
|
|
# Define KPIs that are ONLY valid for specific types
|
|
typeExclusiveKpis = {
|
|
"table": ["columnCount", "rowCount", "headers"], # Only for tables
|
|
"bullet_list": ["itemCount"], # Only for bullet_list
|
|
"list": ["itemCount"] # Only for list
|
|
}
|
|
excludedKpis = []
|
|
for kpiType, kpiFields in typeExclusiveKpis.items():
|
|
if kpiType != contentType:
|
|
excludedKpis.extend(kpiFields)
|
|
|
|
for key, value in section.items():
|
|
if key not in sectionSummary and key not in ["elements"] and key not in excludedKpis:
|
|
# Don't copy type-specific KPIs if they're 0/empty and we didn't extract them ourselves
|
|
# This prevents copying columnCount: 0, rowCount: 0, headers: [] from structure generation phase
|
|
if key in ["columnCount", "rowCount", "headers", "itemCount"]:
|
|
# Skip if it's 0/empty - we'll only include KPIs we extracted from elements
|
|
if isinstance(value, int) and value == 0:
|
|
continue
|
|
if isinstance(value, list) and len(value) == 0:
|
|
continue
|
|
|
|
# Include simple types (str, int, float, bool, list of primitives)
|
|
if isinstance(value, (str, int, float, bool)) or (isinstance(value, list) and len(value) <= 10):
|
|
sectionSummary[key] = value
|
|
|
|
summary["sections"].append(sectionSummary)
|
|
|
|
# Extract statistics from root level (generic - include all statistics fields)
|
|
rootStatistics = jsonData.get("statistics", {})
|
|
if rootStatistics and isinstance(rootStatistics, dict):
|
|
# Merge root statistics into summary statistics
|
|
summary["statistics"].update(rootStatistics)
|
|
|
|
return summary
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Error summarizing JSON structure: {str(e)}")
|
|
return {"error": str(e), "type": "error"}
|
|
|
|
def _analyzeDocuments(self, documents: List[Any]) -> List[Dict[str, Any]]:
|
|
"""
|
|
Analyze documents for validation - includes metadata AND JSON structure summary.
|
|
JSON summary provides structure information (sections, tables with captions, IDs) without full content.
|
|
"""
|
|
if not documents:
|
|
return []
|
|
|
|
summaries = []
|
|
for doc in documents:
|
|
try:
|
|
name = getattr(doc, 'documentName', 'Unknown')
|
|
mimeType = getattr(doc, 'mimeType', 'unknown')
|
|
formatExt = self._detectFormat(doc)
|
|
sizeInfo = self._calculateSize(doc)
|
|
|
|
summary = {
|
|
"name": name,
|
|
"mimeType": mimeType,
|
|
"format": formatExt,
|
|
"size": sizeInfo["readable"]
|
|
}
|
|
|
|
# Extract JSON structure summary - prioritize sourceJson for rendered documents
|
|
sourceJson = getattr(doc, 'sourceJson', None)
|
|
data = getattr(doc, 'documentData', None)
|
|
|
|
# WICHTIG: For rendered documents (HTML, PDF, DOCX, etc.), jsonStructure is METADATA about the structure,
|
|
# NOT the actual rendered content. The actual content is in documentData.
|
|
# Include both: jsonStructure for structure metadata, and contentPreview for actual content check
|
|
if sourceJson and isinstance(sourceJson, dict):
|
|
# Check if this is code generation metadata (has statistics field)
|
|
if "statistics" in sourceJson and "fileType" in sourceJson:
|
|
# Code generation format - extract statistics from metadata
|
|
codeStats = sourceJson.get("statistics", {})
|
|
jsonSummary = {
|
|
"metadata": sourceJson,
|
|
"sections": [],
|
|
"statistics": codeStats
|
|
}
|
|
summary["jsonStructure"] = jsonSummary
|
|
summary["note"] = "jsonStructure contains metadata and statistics for code generation file. Actual rendered content is in documentData."
|
|
else:
|
|
# Document generation format - use standard structure analysis
|
|
jsonSummary = self._summarizeJsonStructure(sourceJson)
|
|
summary["jsonStructure"] = jsonSummary
|
|
summary["note"] = "jsonStructure contains metadata about document structure. Actual rendered content is in documentData."
|
|
|
|
# For rendered documents, also check actual content
|
|
if data is not None:
|
|
contentPreview = self._getContentPreview(data, formatExt, mimeType)
|
|
if contentPreview:
|
|
summary["contentPreview"] = contentPreview
|
|
elif data is not None:
|
|
# For code generation files without sourceJson, extract statistics from content
|
|
if formatExt in ["csv", "json", "xml"]:
|
|
codeStats = self._extractCodeFileStatistics(data, formatExt, mimeType)
|
|
if codeStats:
|
|
jsonSummary = {
|
|
"metadata": {},
|
|
"sections": [],
|
|
"statistics": codeStats
|
|
}
|
|
summary["jsonStructure"] = jsonSummary
|
|
summary["note"] = "jsonStructure contains statistics extracted from code file content."
|
|
# Fallback: try to parse documentData as JSON (for non-rendered documents)
|
|
elif isinstance(data, dict):
|
|
# Summarize JSON structure
|
|
jsonSummary = self._summarizeJsonStructure(data)
|
|
summary["jsonStructure"] = jsonSummary
|
|
elif isinstance(data, list) and len(data) > 0 and isinstance(data[0], dict):
|
|
# Handle list of documents
|
|
jsonSummary = self._summarizeJsonStructure(data[0])
|
|
summary["jsonStructure"] = jsonSummary
|
|
else:
|
|
# For non-JSON data (e.g., rendered HTML), get content preview
|
|
contentPreview = self._getContentPreview(data, formatExt, mimeType)
|
|
if contentPreview:
|
|
summary["contentPreview"] = contentPreview
|
|
|
|
summaries.append(summary)
|
|
except Exception as e:
|
|
logger.warning(f"Error analyzing document {getattr(doc, 'documentName', 'Unknown')}: {str(e)}")
|
|
summaries.append({
|
|
"name": getattr(doc, 'documentName', 'Unknown'),
|
|
"mimeType": getattr(doc, 'mimeType', 'unknown'),
|
|
"format": "unknown",
|
|
"size": "0 B",
|
|
"error": str(e)
|
|
})
|
|
|
|
return summaries
|
|
|
|
def _detectFormat(self, doc: Any) -> str:
|
|
"""Extract format from filename extension (always use extension)"""
|
|
try:
|
|
docName = getattr(doc, 'documentName', '')
|
|
|
|
# Extract from filename extension
|
|
if docName and '.' in docName:
|
|
ext = docName.rsplit('.', 1)[1].lower()
|
|
return ext
|
|
|
|
return 'unknown'
|
|
except Exception as e:
|
|
logger.warning(f"Error detecting format: {str(e)}")
|
|
return 'unknown'
|
|
|
|
def _calculateSize(self, doc: Any) -> Dict[str, Any]:
|
|
"""Calculate document size in bytes and human-readable format"""
|
|
try:
|
|
if not hasattr(doc, 'documentData') or doc.documentData is None:
|
|
return {"bytes": 0, "readable": "0 B"}
|
|
|
|
data = doc.documentData
|
|
size_bytes = 0
|
|
|
|
if isinstance(data, str):
|
|
size_bytes = len(data.encode('utf-8'))
|
|
elif isinstance(data, bytes):
|
|
size_bytes = len(data)
|
|
elif isinstance(data, (dict, list)):
|
|
# Estimate JSON size
|
|
try:
|
|
json_str = json.dumps(data)
|
|
size_bytes = len(json_str.encode('utf-8'))
|
|
except:
|
|
size_bytes = len(str(data).encode('utf-8'))
|
|
else:
|
|
size_bytes = len(str(data).encode('utf-8'))
|
|
|
|
# Convert to human-readable format
|
|
readable = self._formatBytes(size_bytes)
|
|
|
|
return {"bytes": size_bytes, "readable": readable}
|
|
except Exception as e:
|
|
logger.warning(f"Error calculating size: {str(e)}")
|
|
return {"bytes": 0, "readable": "0 B"}
|
|
|
|
def _formatBytes(self, bytes: int) -> str:
|
|
"""Format bytes to human-readable string"""
|
|
for unit in ['B', 'KB', 'MB', 'GB']:
|
|
if bytes < 1024.0:
|
|
return f"{bytes:.1f} {unit}"
|
|
bytes /= 1024.0
|
|
return f"{bytes:.1f} TB"
|
|
|
|
def _getContentPreview(self, data: Any, formatExt: str, mimeType: str) -> Optional[Dict[str, Any]]:
|
|
"""Get structural validation info for rendered documents (generic, NO content preview for security/privacy)
|
|
|
|
Returns metadata about document structure to help validation distinguish between:
|
|
- Structure metadata (jsonStructure) - describes what should be rendered
|
|
- Actual rendered content (documentData) - the actual document file
|
|
|
|
Does NOT expose actual content, only structural indicators.
|
|
"""
|
|
try:
|
|
if data is None:
|
|
return None
|
|
|
|
preview = {}
|
|
|
|
# Generic content type detection
|
|
if isinstance(data, bytes):
|
|
preview["dataType"] = "bytes"
|
|
preview["contentLength"] = len(data)
|
|
# Check if it's likely text-based (for text formats like HTML, TXT, etc.)
|
|
try:
|
|
# Try to decode as UTF-8 to check if it's text-based
|
|
decoded = data.decode('utf-8', errors='strict')
|
|
preview["isTextBased"] = True
|
|
preview["contentLength"] = len(decoded)
|
|
|
|
# For text-based formats, check if it looks like rendered content vs JSON metadata
|
|
# JSON metadata typically starts with { or [ and contains structure keywords
|
|
trimmed = decoded.strip()
|
|
looksLikeJson = (trimmed.startswith('{') or trimmed.startswith('[')) and \
|
|
('"sections"' in trimmed or '"contentPartIds"' in trimmed or '"generationHint"' in trimmed)
|
|
preview["looksLikeRenderedContent"] = not looksLikeJson
|
|
|
|
except UnicodeDecodeError:
|
|
# Not valid UTF-8, likely binary (PDF, DOCX, images, etc.)
|
|
preview["isTextBased"] = False
|
|
preview["isBinary"] = True
|
|
# Binary files with content are rendered (not metadata)
|
|
preview["looksLikeRenderedContent"] = True
|
|
|
|
elif isinstance(data, str):
|
|
preview["dataType"] = "string"
|
|
preview["isTextBased"] = True
|
|
preview["contentLength"] = len(data)
|
|
|
|
# Check if it looks like rendered content vs JSON metadata
|
|
trimmed = data.strip()
|
|
looksLikeJson = (trimmed.startswith('{') or trimmed.startswith('[')) and \
|
|
('"sections"' in trimmed or '"contentPartIds"' in trimmed or '"generationHint"' in trimmed)
|
|
preview["looksLikeRenderedContent"] = not looksLikeJson
|
|
|
|
elif isinstance(data, (dict, list)):
|
|
# If documentData is still a dict/list, it's likely structure metadata, not rendered content
|
|
preview["dataType"] = "json"
|
|
preview["isTextBased"] = True
|
|
preview["looksLikeRenderedContent"] = False
|
|
preview["note"] = "documentData is JSON structure, not rendered document file"
|
|
else:
|
|
preview["dataType"] = type(data).__name__
|
|
preview["contentLength"] = len(str(data)) if hasattr(data, '__len__') else 0
|
|
|
|
return preview if preview else None
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Error getting content structure info: {str(e)}")
|
|
return None
|
|
|
|
def _extractCodeFileStatistics(self, data: Any, formatExt: str, mimeType: str) -> Optional[Dict[str, Any]]:
|
|
"""Extract statistics from code generation files (CSV, JSON, XML) for validation."""
|
|
try:
|
|
# Convert bytes to string if needed
|
|
content = None
|
|
if isinstance(data, bytes):
|
|
try:
|
|
content = data.decode('utf-8')
|
|
except UnicodeDecodeError:
|
|
return None
|
|
elif isinstance(data, str):
|
|
content = data
|
|
else:
|
|
return None
|
|
|
|
if not content:
|
|
return None
|
|
|
|
stats = {}
|
|
|
|
if formatExt == "csv":
|
|
import csv
|
|
import io
|
|
try:
|
|
reader = csv.reader(io.StringIO(content))
|
|
rows = list(reader)
|
|
if rows:
|
|
headerRow = rows[0]
|
|
stats["rowCount"] = len(rows) - 1 # Exclude header
|
|
stats["columnCount"] = len(headerRow)
|
|
stats["headerRow"] = headerRow
|
|
stats["dataRowCount"] = len(rows) - 1
|
|
except Exception as e:
|
|
logger.debug(f"CSV statistics extraction failed: {e}")
|
|
|
|
elif formatExt == "json":
|
|
try:
|
|
parsed = json.loads(content)
|
|
stats["isArray"] = isinstance(parsed, list)
|
|
stats["isObject"] = isinstance(parsed, dict)
|
|
if isinstance(parsed, list):
|
|
stats["itemCount"] = len(parsed)
|
|
stats["objectCount"] = sum(1 for item in parsed if isinstance(item, dict))
|
|
stats["arrayCount"] = sum(1 for item in parsed if isinstance(item, list))
|
|
elif isinstance(parsed, dict):
|
|
stats["keyCount"] = len(parsed)
|
|
stats["keys"] = list(parsed.keys())
|
|
stats["objectCount"] = sum(1 for v in parsed.values() if isinstance(v, dict))
|
|
stats["arrayCount"] = sum(1 for v in parsed.values() if isinstance(v, list))
|
|
except Exception as e:
|
|
logger.debug(f"JSON statistics extraction failed: {e}")
|
|
|
|
elif formatExt == "xml":
|
|
try:
|
|
import xml.etree.ElementTree as ET
|
|
root = ET.fromstring(content)
|
|
stats["elementCount"] = len(list(root.iter()))
|
|
stats["attributeCount"] = sum(len(elem.attrib) for elem in root.iter())
|
|
stats["rootElement"] = root.tag
|
|
stats["hasRoot"] = True
|
|
except Exception as e:
|
|
logger.debug(f"XML statistics extraction failed: {e}")
|
|
|
|
return stats if stats else None
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Error extracting code file statistics: {str(e)}")
|
|
return None
|
|
|
|
def _isFormatCompatible(self, deliveredFormat: str, expectedFormat: str) -> bool:
|
|
"""
|
|
Generic format compatibility check.
|
|
- txt/md/html are text formats (compatible with each other)
|
|
- pdf/docx/xlsx are document formats (not compatible with each other)
|
|
- json/xml are structured formats
|
|
- images are image formats
|
|
"""
|
|
deliveredLower = deliveredFormat.lower()
|
|
expectedLower = expectedFormat.lower()
|
|
|
|
# Exact match
|
|
if deliveredLower == expectedLower:
|
|
return True
|
|
|
|
# Text formats are interchangeable
|
|
textFormats = ['txt', 'md', 'html', 'text', 'plain']
|
|
if deliveredLower in textFormats and expectedLower in textFormats:
|
|
return True
|
|
|
|
# Structured formats
|
|
if deliveredLower in ['json', 'xml'] and expectedLower in ['json', 'xml']:
|
|
return True
|
|
|
|
# Document formats are NOT compatible with each other
|
|
documentFormats = ['pdf', 'docx', 'xlsx', 'pptx']
|
|
if deliveredLower in documentFormats and expectedLower in documentFormats:
|
|
return False # pdf ≠ docx
|
|
|
|
return False
|
|
|
|
async def _validateWithAI(self, documents: List[Any], intent: Dict[str, Any], taskStep: Optional[Any] = None, actionName: Optional[str] = None, actionParameters: Optional[Dict[str, Any]] = None, actionHistory: Optional[List[Dict[str, Any]]] = None, context: Optional[Any] = None) -> Dict[str, Any]:
|
|
"""AI-based comprehensive validation - generic approach"""
|
|
try:
|
|
if not hasattr(self, 'services') or not self.services or not hasattr(self.services, 'ai'):
|
|
return self._createFailedValidationResult("AI service not available")
|
|
|
|
# Use taskStep.objective if available, otherwise fall back to workflow intent
|
|
taskObjective = None
|
|
if taskStep and hasattr(taskStep, 'objective'):
|
|
taskObjective = taskStep.objective
|
|
elif taskStep and isinstance(taskStep, dict):
|
|
taskObjective = taskStep.get('objective')
|
|
|
|
# Use taskStep format fields if available, otherwise fall back to intent
|
|
dataType = None
|
|
expectedFormats = None
|
|
if taskStep:
|
|
if hasattr(taskStep, 'dataType') and taskStep.dataType:
|
|
dataType = taskStep.dataType
|
|
elif isinstance(taskStep, dict):
|
|
dataType = taskStep.get('dataType')
|
|
if hasattr(taskStep, 'expectedFormats') and taskStep.expectedFormats:
|
|
expectedFormats = taskStep.expectedFormats
|
|
elif isinstance(taskStep, dict):
|
|
expectedFormats = taskStep.get('expectedFormats')
|
|
|
|
# Fallback to intent if taskStep format fields not available
|
|
if not dataType:
|
|
dataType = intent.get('dataType', 'unknown')
|
|
if not expectedFormats:
|
|
expectedFormats = intent.get('expectedFormats', [])
|
|
|
|
# Determine objective text and label
|
|
workflowIntent = getattr(self.services.workflow, '_workflowIntent', {}) if hasattr(self.services, 'workflow') and self.services.workflow else {}
|
|
intentText = workflowIntent.get('intent', 'Unknown')
|
|
objectiveText = taskObjective if taskObjective else intentText
|
|
objectiveLabel = "TASK OBJECTIVE" if taskObjective else "USER REQUEST"
|
|
|
|
# Build prompt base WITHOUT document summaries first
|
|
# Use success criteria from taskStep if available, otherwise from intent
|
|
successCriteria = []
|
|
if taskStep and hasattr(taskStep, 'successCriteria') and taskStep.successCriteria:
|
|
successCriteria = taskStep.successCriteria
|
|
elif taskStep and isinstance(taskStep, dict):
|
|
successCriteria = taskStep.get('successCriteria', [])
|
|
else:
|
|
successCriteria = intent.get('successCriteria', [])
|
|
criteriaCount = len(successCriteria)
|
|
|
|
# Build action name context with human-readable description
|
|
actionContext = ""
|
|
if actionName:
|
|
# Convert action name to human-readable format
|
|
actionDescription = actionName.replace("ai.", "").replace(".", " ").title()
|
|
actionContext = f"\nDOCUMENTS CREATED BY: {actionDescription} ({actionName})"
|
|
|
|
# Build action parameters context
|
|
actionParamsContext = ""
|
|
if actionParameters and isinstance(actionParameters, dict) and len(actionParameters) > 0:
|
|
# Filter out documentList and other large/redundant parameters for clarity
|
|
relevantParams = {k: v for k, v in actionParameters.items()
|
|
if k not in ['documentList', 'connections'] and v is not None}
|
|
if relevantParams:
|
|
paramsJson = json.dumps(relevantParams, ensure_ascii=False, indent=2)
|
|
actionParamsContext = f"\nACTION PARAMETERS USED: {paramsJson}"
|
|
|
|
# Extract validation metadata from documents (action-specific context)
|
|
validationMetadataContext = ""
|
|
if documents:
|
|
metadataList = []
|
|
for doc in documents:
|
|
metadata = getattr(doc, 'validationMetadata', None)
|
|
if metadata and isinstance(metadata, dict):
|
|
metadataList.append(metadata)
|
|
|
|
if metadataList:
|
|
# Combine all metadata (usually just one document)
|
|
combinedMetadata = {}
|
|
for meta in metadataList:
|
|
combinedMetadata.update(meta)
|
|
|
|
if combinedMetadata:
|
|
metadataJson = json.dumps(combinedMetadata, ensure_ascii=False, indent=2)
|
|
validationMetadataContext = f"\nACTION VALIDATION METADATA: {metadataJson}"
|
|
|
|
# Build action history context (for multi-step workflow validation)
|
|
actionHistoryContext = ""
|
|
if actionHistory and isinstance(actionHistory, list) and len(actionHistory) > 0:
|
|
historyEntries = []
|
|
for entry in actionHistory:
|
|
if isinstance(entry, dict):
|
|
action = entry.get('action', 'unknown')
|
|
params = entry.get('parameters', {}) or {}
|
|
step = entry.get('step', 0)
|
|
# Filter out documentList for clarity
|
|
relevantParams = {k: v for k, v in params.items() if k not in ['documentList', 'connections'] and v is not None}
|
|
paramsStr = json.dumps(relevantParams, ensure_ascii=False) if relevantParams else "{}"
|
|
historyEntries.append(f"Step {step}: {action} {paramsStr}")
|
|
elif isinstance(entry, str):
|
|
historyEntries.append(entry)
|
|
|
|
if historyEntries:
|
|
actionHistoryContext = f"\n\n=== ACTION HISTORY ===\n" + "\n".join(f"- {entry}" for entry in historyEntries)
|
|
actionHistoryContext += "\n\nIMPORTANT: This shows the complete workflow that produced the documents. For process-oriented criteria (e.g., 'internet search performed'), check ACTION HISTORY first. Document metadata may only reflect the LAST action, not the entire workflow."
|
|
|
|
# Build document index context (all documents delivered in current round)
|
|
documentIndexContext = ""
|
|
if context and self.services and hasattr(self.services, 'chat') and hasattr(self.services, 'workflow') and self.services.workflow:
|
|
try:
|
|
documentIndex = self.services.chat.getAvailableDocuments(self.services.workflow)
|
|
if documentIndex and documentIndex.strip() and documentIndex != "No documents available":
|
|
# Extract only "Current round documents" section if present
|
|
lines = documentIndex.split('\n')
|
|
currentRoundSection = []
|
|
inCurrentRound = False
|
|
for line in lines:
|
|
if "Current round documents:" in line:
|
|
inCurrentRound = True
|
|
currentRoundSection.append(line)
|
|
elif inCurrentRound:
|
|
if line.strip().startswith("- docList:") or line.strip().startswith(" - docItem:") or line.strip().startswith("- docItem:"):
|
|
currentRoundSection.append(line)
|
|
elif line.strip() == "":
|
|
# Empty line is okay, continue
|
|
continue
|
|
elif "Past rounds documents:" in line or "AVAILABLE_CONNECTIONS_INDEX:" in line:
|
|
# End of current round section
|
|
break
|
|
else:
|
|
# Still in current round section
|
|
currentRoundSection.append(line)
|
|
|
|
if currentRoundSection:
|
|
documentIndexContext = "\n\n=== ALL DOCUMENTS DELIVERED IN CURRENT ROUND ===\n" + "\n".join(currentRoundSection)
|
|
documentIndexContext += "\n\nIMPORTANT: This shows ALL documents that have been delivered in the current round, not just the ones being validated in this step. Use this to check if all required formats/documents are present across the entire round."
|
|
except Exception as e:
|
|
logger.warning(f"Error extracting document index for validation: {str(e)}")
|
|
# Continue without document index - not critical
|
|
|
|
# Transform criteria that require data access into metadata-only checks
|
|
transformedCriteria = self._transformCriteriaForMetadataOnly(successCriteria)
|
|
|
|
# Format success criteria for display with index numbers
|
|
if transformedCriteria:
|
|
criteriaDisplay = "\n".join([f"[{i}] {criterion}" for i, criterion in enumerate(transformedCriteria)])
|
|
else:
|
|
criteriaDisplay = "[]"
|
|
|
|
promptBase = f"""TASK VALIDATION
|
|
|
|
=== TASK INFORMATION ===
|
|
{objectiveLabel}: '{objectiveText}'
|
|
EXPECTED DATA TYPE: {dataType}
|
|
EXPECTED FORMATS: {expectedFormats if expectedFormats else ['any']}{actionContext}{actionParamsContext}{validationMetadataContext}{actionHistoryContext}{documentIndexContext}
|
|
|
|
=== VALIDATION INSTRUCTIONS ===
|
|
|
|
CRITICAL: Validate ONLY metadata/structure. Documents may be binary (PDF, DOCX, images) or very large (200MB+). NEVER try to read or validate actual content values.
|
|
|
|
VALIDATION RULES:
|
|
1. METADATA ONLY: Use jsonStructure (sections, contentPartIds, content_type, statistics) and contentPreview (dataType, contentLength, looksLikeRenderedContent) for validation. These are METADATA indicators, NOT actual content.
|
|
2. FORMAT VALIDATION: Check mimeType/format metadata only. Do NOT inspect content to determine format. Format mismatch = wrong_format gap.
|
|
3. CONTENT EXISTENCE: Use contentPreview.looksLikeRenderedContent=true to confirm content exists. Use jsonStructure.content_type to confirm data types exist (e.g., "image" section = image exists, "bullet_list" section = bullet list exists, "table" section = table exists). If a section with a content_type exists, the content has been delivered. Do NOT assume content was AI-generated vs extracted - if the section exists, it was delivered.
|
|
4. STRUCTURE VALIDATION: Use jsonStructure.sections, statistics (counts, rowCount, columnCount, itemCount) as evidence. Trust structure metadata over format claims. Only check KPIs if they are present (missing KPIs mean elements not yet populated, not that content is missing).
|
|
5. PROCESS VALIDATION: Use ACTION HISTORY for process-oriented criteria (e.g., "search performed", "extraction done").
|
|
6. ONE CRITERION PER EVALUATION: Evaluate each criterion independently. Do not mention other criteria.
|
|
7. NO ASSUMPTIONS: Do NOT assume content was AI-generated vs extracted. If a section exists with content_type, the content was delivered. Only validate what is present in the metadata.
|
|
8. DATA-LEVEL CRITERIA TRANSFORMATION: Criteria mentioning accuracy percentages (e.g., "95% accuracy"), completeness percentages (e.g., "98% completeness"), or "all X extracted" have been transformed to metadata-only checks. For accuracy/completeness: Check if contentPartIds reference all source documents and if structure metadata shows expected data types (tables, lists, etc.) exist. For "all X extracted": Check if contentPartIds reference all source documents mentioned in ACTION HISTORY or document index. NEVER attempt to verify accuracy/completeness by comparing actual data values - only use metadata indicators.
|
|
|
|
VALIDATION STEPS:
|
|
- Check ACTION HISTORY for process-oriented criteria
|
|
- Check jsonStructure metadata (sections, content_type, statistics) for structure validation
|
|
- Check contentPreview.looksLikeRenderedContent for content existence (not quality)
|
|
- Check mimeType/format for format validation
|
|
- NEVER try to read actual content values (binary files, large files, data accuracy)
|
|
- Classify gaps: missing_data, incomplete_data, wrong_structure, wrong_format
|
|
|
|
SCORING:
|
|
- Data complete + structure matches → qualityScore: 0.9-1.0
|
|
- Data complete but format issues → qualityScore: 0.7-0.9
|
|
- Missing/incomplete data → qualityScore: <0.7
|
|
- Format mismatch only (data present) → qualityScore: 0.6-0.7
|
|
|
|
SUGGESTIONS:
|
|
- ONE suggestion per UNMET criterion, ordered by criteriaMapping index
|
|
- Reference actual structure values found and required values
|
|
- Calculate quantitative gaps when numbers are available
|
|
- Be specific and actionable based on structure evidence
|
|
|
|
=== OUTPUT FORMAT ===
|
|
{{
|
|
"overallSuccess": false,
|
|
"qualityScore": 0.0,
|
|
"dataTypeMatch": false,
|
|
"formatMatch": false,
|
|
"documentCount": {len(documents)},
|
|
"criteriaMapping": [
|
|
{{
|
|
"index": 0,
|
|
"criterion": "exact_criterion_text",
|
|
"met": false,
|
|
"reason": "explanation_for_this_criterion_only"
|
|
}}
|
|
],
|
|
"gapAnalysis": "Brief gap summary",
|
|
"gapType": "missing_data" | "wrong_structure" | "wrong_format" | "incomplete_data" | "no_gap",
|
|
"structureComparison": {{
|
|
"required": {{}},
|
|
"found": {{}},
|
|
"gap": {{}}
|
|
}},
|
|
"improvementSuggestions": ["One suggestion per unmet criterion"],
|
|
"validationDetails": [
|
|
{{
|
|
"documentName": "name.ext",
|
|
"issues": ["Specific issue"],
|
|
"suggestions": ["Specific fix"]
|
|
}}
|
|
]
|
|
}}
|
|
|
|
=== DATA ===
|
|
|
|
SUCCESS CRITERIA TO VALIDATE in criteriaMapping array:
|
|
{criteriaDisplay}
|
|
|
|
DELIVERED DOCUMENTS ({len(documents)} items):
|
|
"""
|
|
|
|
# Analyze documents
|
|
documentSummaries = self._analyzeDocuments(documents)
|
|
|
|
# Build final prompt with summaries at the end
|
|
documentsJson = json.dumps(documentSummaries, indent=2, ensure_ascii=False)
|
|
validationPrompt = promptBase + documentsJson
|
|
|
|
# Call AI service for validation
|
|
response = await self.services.ai.callAiPlanning(
|
|
prompt=validationPrompt,
|
|
placeholders=None,
|
|
debugType="contentvalidation"
|
|
)
|
|
|
|
if not response or not response.strip():
|
|
logger.warning("AI validation returned empty response")
|
|
raise ValueError("AI validation failed - empty response")
|
|
|
|
# Clean and extract JSON from response using proper JSON extraction utility
|
|
# This handles nested structures and markdown code blocks correctly
|
|
result = response.strip()
|
|
logger.debug(f"AI validation response length: {len(result)}")
|
|
|
|
# Extract JSON first
|
|
extractedJson = self.services.utils.jsonExtractString(result)
|
|
if not extractedJson:
|
|
logger.debug(f"No JSON found in AI response: {result[:200]}...")
|
|
logger.debug(f"Full AI response: {result}")
|
|
raise ValueError("AI validation failed - no JSON in response")
|
|
|
|
# Proactively fix Python-style booleans (False/True -> false/true) BEFORE parsing
|
|
# This handles booleans in any context: standalone, in lists, in dicts, etc.
|
|
# Use word boundaries but also handle cases where booleans are in brackets/arrays
|
|
# Replace False/True regardless of context (word boundary handles string matching correctly)
|
|
normalizedJson = re.sub(r'\bFalse\b', 'false', extractedJson)
|
|
normalizedJson = re.sub(r'\bTrue\b', 'true', normalizedJson)
|
|
|
|
logger.debug(f"Extracted JSON (before normalization): {extractedJson[:200]}...")
|
|
logger.debug(f"Normalized JSON (after boolean fix): {normalizedJson[:200]}...")
|
|
|
|
# Now try to parse the normalized JSON
|
|
try:
|
|
aiResult = json.loads(normalizedJson)
|
|
logger.info("AI validation JSON parsed successfully")
|
|
except json.JSONDecodeError as json_error:
|
|
logger.warning(f"AI validation invalid JSON after normalization: {str(json_error)}")
|
|
logger.debug(f"JSON content that failed: {normalizedJson[:500]}...")
|
|
raise ValueError(f"AI validation failed - invalid JSON: {str(json_error)}")
|
|
|
|
overall = aiResult.get("overallSuccess")
|
|
quality = aiResult.get("qualityScore")
|
|
details = aiResult.get("validationDetails")
|
|
gap = aiResult.get("gapAnalysis", "")
|
|
improvements = aiResult.get("improvementSuggestions", [])
|
|
gap_type = aiResult.get("gapType", "")
|
|
structure_comp = aiResult.get("structureComparison", {})
|
|
criteria_mapping = aiResult.get("criteriaMapping", [])
|
|
|
|
# Normalize while keeping failures explicit
|
|
normalized = {
|
|
"overallSuccess": overall if isinstance(overall, bool) else None,
|
|
"qualityScore": float(quality) if isinstance(quality, (int, float)) else None,
|
|
"documentCount": len(documentSummaries),
|
|
"gapAnalysis": gap if gap else "",
|
|
"gapType": gap_type if gap_type else "",
|
|
"structureComparison": structure_comp if structure_comp else {},
|
|
"criteriaMapping": criteria_mapping if isinstance(criteria_mapping, list) else [],
|
|
"validationDetails": details if isinstance(details, list) else [{
|
|
"documentName": "AI Validation",
|
|
"gapAnalysis": gap
|
|
}],
|
|
"improvementSuggestions": improvements,
|
|
"schemaCompliant": True,
|
|
"originalType": "json",
|
|
"missingFields": []
|
|
}
|
|
|
|
if normalized["overallSuccess"] is None:
|
|
normalized["missingFields"].append("overallSuccess")
|
|
if normalized["qualityScore"] is None:
|
|
normalized["missingFields"].append("qualityScore")
|
|
if normalized["missingFields"]:
|
|
normalized["schemaCompliant"] = False
|
|
|
|
return normalized
|
|
|
|
except Exception as e:
|
|
logger.error(f"AI validation failed: {str(e)}")
|
|
raise
|
|
|
|
def _transformCriteriaForMetadataOnly(self, criteria: List[str]) -> List[str]:
|
|
"""
|
|
Transform criteria that require data access into metadata-only checks.
|
|
|
|
Preserves original criterion intent while converting data-level checks to metadata checks.
|
|
Examples:
|
|
- "95% accuracy" → "[METADATA ONLY] Data structure indicates extraction completed (check contentPartIds reference all source documents)"
|
|
- "98% completeness" → "[METADATA ONLY] All source documents referenced in contentPartIds (verify source count matches)"
|
|
- "all transactions extracted" → "[METADATA ONLY] All source documents referenced in contentPartIds (verify source count matches)"
|
|
"""
|
|
if not criteria:
|
|
return []
|
|
|
|
transformed = []
|
|
for criterion in criteria:
|
|
original = criterion.strip()
|
|
transformed_criterion = original
|
|
|
|
# Pattern: accuracy percentage (e.g., "95% accuracy", "accuracy meets or exceeds 95% threshold")
|
|
if re.search(r'\d+%?\s*accuracy|accuracy.*\d+%', original, re.IGNORECASE):
|
|
# Extract the main subject (e.g., "transactions", "data", etc.)
|
|
subject_match = re.search(r'(transactions?|data|items?|records?|entries?)', original, re.IGNORECASE)
|
|
subject = subject_match.group(1).lower() if subject_match else "data"
|
|
|
|
transformed_criterion = f"[METADATA ONLY] {original}: Check that contentPartIds reference all source documents and jsonStructure shows expected {subject} structure exists (tables/lists with rowCount/itemCount > 0). Cannot verify actual {subject} accuracy values from metadata."
|
|
|
|
# Pattern: completeness percentage or "all X extracted" (e.g., "98% completeness", "all transactions extracted")
|
|
elif re.search(r'\d+%?\s*completeness|completeness.*\d+%|all\s+.*extracted|extract.*all', original, re.IGNORECASE):
|
|
# Extract the main subject
|
|
subject_match = re.search(r'(transactions?|data|items?|records?|entries?|statements?|documents?)', original, re.IGNORECASE)
|
|
subject = subject_match.group(1).lower() if subject_match else "items"
|
|
|
|
transformed_criterion = f"[METADATA ONLY] {original}: Verify that contentPartIds reference all source documents mentioned in ACTION HISTORY/document index, and jsonStructure shows {subject} structure exists (check rowCount/itemCount in tables/lists). Cannot verify actual {subject} count from metadata."
|
|
|
|
# Pattern: "no missing data" or "no incorrect data"
|
|
elif re.search(r'no\s+missing|no\s+incorrect|no\s+errors?', original, re.IGNORECASE):
|
|
transformed_criterion = f"[METADATA ONLY] {original}: Check that jsonStructure.content_type shows expected data types present (tables, lists, etc.) and contentPreview.looksLikeRenderedContent=true. Cannot verify actual data values from metadata."
|
|
|
|
# Pattern: data accuracy without percentage (e.g., "data is accurate", "accurate data")
|
|
elif re.search(r'data.*accurate|accurate.*data', original, re.IGNORECASE) and '%' not in original:
|
|
transformed_criterion = f"[METADATA ONLY] {original}: Check that contentPartIds reference source documents and jsonStructure shows expected data structure exists. Cannot verify actual data accuracy values from metadata."
|
|
|
|
transformed.append(transformed_criterion)
|
|
|
|
return transformed
|
|
|
|
def _createFailedValidationResult(self, errorMessage: str) -> Dict[str, Any]:
|
|
"""Create a standardized failed validation result"""
|
|
return {
|
|
"overallSuccess": False,
|
|
"qualityScore": 0.0,
|
|
"dataTypeMatch": False,
|
|
"formatMatch": False,
|
|
"documentCount": 0,
|
|
"criteriaMapping": [],
|
|
"gapAnalysis": errorMessage,
|
|
"improvementSuggestions": [],
|
|
"validationDetails": [],
|
|
"schemaCompliant": True,
|
|
"originalType": "error",
|
|
"missingFields": [],
|
|
"error": errorMessage
|
|
}
|