696 lines
35 KiB
Python
696 lines
35 KiB
Python
# contentValidator.py
|
|
# Content validation for adaptive Dynamic mode
|
|
# Generic, document-aware validation system
|
|
|
|
import logging
|
|
import json
|
|
import base64
|
|
import re
|
|
from typing import List, Dict, Any, Optional
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Configuration constants
|
|
MAX_CONTENT_SIZE_FOR_FULL_PREVIEW = 50 * 1024 # 50KB threshold
|
|
PREVIEW_SAMPLE_SIZE = 1024 # 1KB preview for large documents
|
|
|
|
|
|
class ContentValidator:
|
|
"""Validates delivered content against user intent - generic and document-aware"""
|
|
|
|
def __init__(self, services=None, learningEngine=None):
|
|
self.services = services
|
|
self.learningEngine = learningEngine
|
|
|
|
async def validateContent(self, documents: List[Any], intent: Dict[str, Any], taskStep: Optional[Any] = None, actionName: Optional[str] = None, actionParameters: Optional[Dict[str, Any]] = None, actionHistory: Optional[List[Dict[str, Any]]] = None) -> Dict[str, Any]:
|
|
"""Validates delivered content against user intent using AI (single attempt; parse-or-fail)
|
|
|
|
Args:
|
|
documents: List of documents to validate
|
|
intent: Workflow-level intent dict (for format requirements)
|
|
taskStep: Optional TaskStep object (preferred source for objective)
|
|
actionName: Optional action name (e.g., "ai.process", "ai.webResearch") that created the documents
|
|
actionParameters: Optional action parameters used during execution (e.g., {"columnsPerRow": 10, "researchDepth": "deep"})
|
|
actionHistory: Optional list of previously executed actions in the workflow (for multi-step workflow context)
|
|
"""
|
|
return await self._validateWithAI(documents, intent, taskStep, actionName, actionParameters, actionHistory)
|
|
|
|
def _analyzeDocuments(self, documents: List[Any]) -> List[Dict[str, Any]]:
|
|
"""Generic document analysis - create simple summaries with metadata."""
|
|
summaries = []
|
|
for doc in documents:
|
|
try:
|
|
data = getattr(doc, 'documentData', None)
|
|
name = getattr(doc, 'documentName', 'Unknown')
|
|
mimeType = getattr(doc, 'mimeType', 'unknown')
|
|
formatExt = self._detectFormat(doc)
|
|
sizeInfo = self._calculateSize(doc)
|
|
|
|
# Simple preview: if it's dict/list, dump JSON; otherwise use string
|
|
preview = None
|
|
if data is not None:
|
|
if isinstance(data, (dict, list)):
|
|
preview = json.dumps(data, indent=2, ensure_ascii=False)
|
|
# Truncate if too large
|
|
if len(preview) > MAX_CONTENT_SIZE_FOR_FULL_PREVIEW:
|
|
preview = preview[:PREVIEW_SAMPLE_SIZE] + f"\n\n[Truncated - {self._formatBytes(sizeInfo['bytes'])} total]"
|
|
else:
|
|
text = str(data)
|
|
if len(text) > MAX_CONTENT_SIZE_FOR_FULL_PREVIEW:
|
|
preview = text[:PREVIEW_SAMPLE_SIZE] + f"\n\n[Truncated - {self._formatBytes(sizeInfo['bytes'])} total]"
|
|
else:
|
|
preview = text
|
|
|
|
summary = {
|
|
"name": name,
|
|
"mimeType": mimeType,
|
|
"format": formatExt,
|
|
"size": sizeInfo["readable"],
|
|
"preview": preview
|
|
}
|
|
summaries.append(summary)
|
|
except Exception as e:
|
|
logger.warning(f"Error analyzing document {getattr(doc, 'documentName', 'Unknown')}: {str(e)}")
|
|
summaries.append({
|
|
"name": getattr(doc, 'documentName', 'Unknown'),
|
|
"mimeType": getattr(doc, 'mimeType', 'unknown'),
|
|
"format": "unknown",
|
|
"size": "0 B",
|
|
"preview": None,
|
|
"error": str(e)
|
|
})
|
|
return summaries
|
|
|
|
def _calculateAvailablePromptSpace(self, basePromptSizeBytes: int) -> int:
|
|
"""Calculate available space for document summaries based on model context length."""
|
|
try:
|
|
from modules.aicore.aicoreModelRegistry import modelRegistry
|
|
from modules.aicore.aicoreModelSelector import modelSelector
|
|
from modules.datamodels.datamodelAi import AiCallOptions, OperationTypeEnum
|
|
|
|
# Get available models
|
|
availableModels = modelRegistry.getAvailableModels()
|
|
|
|
# Create options for PLAN operation (what validation uses)
|
|
# Use default values for priority and processingMode (will use defaults from AiCallOptions)
|
|
from modules.datamodels.datamodelAi import PriorityEnum, ProcessingModeEnum
|
|
options = AiCallOptions(
|
|
operationType=OperationTypeEnum.PLAN,
|
|
priority=PriorityEnum.BALANCED,
|
|
processingMode=ProcessingModeEnum.BASIC
|
|
)
|
|
|
|
# Get failover model list to find the model that will be used
|
|
failoverModels = modelSelector.getFailoverModelList("", "", options, availableModels)
|
|
|
|
if not failoverModels:
|
|
# Fallback: assume 16K tokens context (conservative)
|
|
logger.warning("No models available for space calculation, using fallback: 16K tokens")
|
|
maxBytes = 16 * 1024 * 4 # 16K tokens * 4 bytes per token
|
|
else:
|
|
# Use the first (best) model
|
|
model = failoverModels[0]
|
|
# Calculate 80% of context length in bytes (tokens * 4 bytes per token)
|
|
maxBytes = int(model.contextLength * 0.8 * 4)
|
|
|
|
# Available space = max - base prompt - safety margin (10%)
|
|
availableBytes = int((maxBytes - basePromptSizeBytes) * 0.9)
|
|
|
|
# Ensure minimum available space (at least 1KB)
|
|
availableBytes = max(availableBytes, 1024)
|
|
|
|
logger.debug(f"Prompt space calculation: base={basePromptSizeBytes} bytes, max={maxBytes} bytes, available={availableBytes} bytes")
|
|
|
|
return availableBytes
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Error calculating available prompt space: {str(e)}, using fallback: 8KB")
|
|
# Fallback: assume 8KB available
|
|
return 8 * 1024
|
|
|
|
def _summarizeJsonStructure(self, jsonData: Any) -> Dict[str, Any]:
|
|
"""Summarize JSON document structure for validation - extracts main objects, statistics, captions, and IDs."""
|
|
try:
|
|
if not isinstance(jsonData, dict):
|
|
return {"type": "non-dict", "preview": str(jsonData)[:200]}
|
|
|
|
summary = {
|
|
"metadata": {},
|
|
"sections": [],
|
|
"statistics": {}
|
|
}
|
|
|
|
# Extract metadata - include ALL metadata fields (generic for all action types)
|
|
metadata = jsonData.get("metadata", {})
|
|
if metadata and isinstance(metadata, dict):
|
|
# Include all metadata fields, not just specific ones
|
|
summary["metadata"] = dict(metadata)
|
|
|
|
# Extract documents array (if present)
|
|
documents = jsonData.get("documents", [])
|
|
if documents:
|
|
summary["statistics"]["documentCount"] = len(documents)
|
|
# Process first document (most common case)
|
|
if len(documents) > 0:
|
|
doc = documents[0]
|
|
docSections = doc.get("sections", [])
|
|
summary["statistics"]["sectionCount"] = len(docSections)
|
|
|
|
# Summarize sections
|
|
for section in docSections:
|
|
sectionSummary = {
|
|
"id": section.get("id"),
|
|
"content_type": section.get("content_type"),
|
|
"title": section.get("title"),
|
|
"order": section.get("order")
|
|
}
|
|
|
|
# For tables: extract caption and statistics
|
|
if section.get("content_type") == "table":
|
|
elements = section.get("elements", [])
|
|
if elements and isinstance(elements, list) and len(elements) > 0:
|
|
tableElement = elements[0]
|
|
sectionSummary["caption"] = tableElement.get("caption")
|
|
headers = tableElement.get("headers", [])
|
|
rows = tableElement.get("rows", [])
|
|
sectionSummary["columnCount"] = len(headers)
|
|
sectionSummary["rowCount"] = len(rows)
|
|
sectionSummary["headers"] = headers # Include headers for context
|
|
|
|
# For lists: extract item count
|
|
elif section.get("content_type") == "list":
|
|
elements = section.get("elements", [])
|
|
if elements and isinstance(elements, list) and len(elements) > 0:
|
|
listElement = elements[0]
|
|
items = listElement.get("items", [])
|
|
sectionSummary["itemCount"] = len(items)
|
|
|
|
# For paragraphs/headings: extract text preview
|
|
elif section.get("content_type") in ["paragraph", "heading"]:
|
|
elements = section.get("elements", [])
|
|
if elements and isinstance(elements, list) and len(elements) > 0:
|
|
textElement = elements[0]
|
|
text = textElement.get("text", "")
|
|
if text:
|
|
sectionSummary["textPreview"] = text[:100] + ("..." if len(text) > 100 else "")
|
|
# Also check for textPreview directly in section (for web crawl results)
|
|
if section.get("textPreview"):
|
|
sectionSummary["textPreview"] = section.get("textPreview")
|
|
|
|
# Include any additional fields from section (generic approach)
|
|
# This ensures all action-specific fields are preserved
|
|
for key, value in section.items():
|
|
if key not in sectionSummary and key not in ["elements"]: # Skip elements as they're processed separately
|
|
# Include simple types (str, int, float, bool, list of primitives)
|
|
if isinstance(value, (str, int, float, bool)) or (isinstance(value, list) and len(value) <= 10):
|
|
sectionSummary[key] = value
|
|
|
|
summary["sections"].append(sectionSummary)
|
|
else:
|
|
# Fallback: check for sections directly in root
|
|
sections = jsonData.get("sections", [])
|
|
if sections:
|
|
summary["statistics"]["sectionCount"] = len(sections)
|
|
for section in sections:
|
|
sectionSummary = {
|
|
"id": section.get("id"),
|
|
"content_type": section.get("content_type"),
|
|
"title": section.get("title"),
|
|
"order": section.get("order")
|
|
}
|
|
|
|
if section.get("content_type") == "table":
|
|
elements = section.get("elements", [])
|
|
if elements and isinstance(elements, list) and len(elements) > 0:
|
|
tableElement = elements[0]
|
|
sectionSummary["caption"] = tableElement.get("caption")
|
|
headers = tableElement.get("headers", [])
|
|
rows = tableElement.get("rows", [])
|
|
sectionSummary["columnCount"] = len(headers)
|
|
sectionSummary["rowCount"] = len(rows)
|
|
sectionSummary["headers"] = headers
|
|
|
|
# Include any additional fields from section (generic approach)
|
|
for key, value in section.items():
|
|
if key not in sectionSummary and key not in ["elements"]: # Skip elements as they're processed separately
|
|
# Include simple types (str, int, float, bool, list of primitives)
|
|
if isinstance(value, (str, int, float, bool)) or (isinstance(value, list) and len(value) <= 10):
|
|
sectionSummary[key] = value
|
|
|
|
summary["sections"].append(sectionSummary)
|
|
|
|
# Extract statistics from root level (generic - include all statistics fields)
|
|
rootStatistics = jsonData.get("statistics", {})
|
|
if rootStatistics and isinstance(rootStatistics, dict):
|
|
# Merge root statistics into summary statistics
|
|
summary["statistics"].update(rootStatistics)
|
|
|
|
return summary
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Error summarizing JSON structure: {str(e)}")
|
|
return {"error": str(e), "type": "error"}
|
|
|
|
def _analyzeDocumentsWithSizeLimit(self, documents: List[Any], maxTotalBytes: int) -> List[Dict[str, Any]]:
|
|
"""
|
|
Analyze documents for validation - includes metadata AND JSON structure summary.
|
|
JSON summary provides structure information (sections, tables with captions, IDs) without full content.
|
|
"""
|
|
if not documents:
|
|
return []
|
|
|
|
summaries = []
|
|
for doc in documents:
|
|
try:
|
|
name = getattr(doc, 'documentName', 'Unknown')
|
|
mimeType = getattr(doc, 'mimeType', 'unknown')
|
|
formatExt = self._detectFormat(doc)
|
|
sizeInfo = self._calculateSize(doc)
|
|
|
|
summary = {
|
|
"name": name,
|
|
"mimeType": mimeType,
|
|
"format": formatExt,
|
|
"size": sizeInfo["readable"]
|
|
}
|
|
|
|
# Extract JSON structure summary - prioritize sourceJson for rendered documents
|
|
sourceJson = getattr(doc, 'sourceJson', None)
|
|
data = getattr(doc, 'documentData', None)
|
|
|
|
if sourceJson and isinstance(sourceJson, dict):
|
|
# Use source JSON for structure analysis (for rendered documents like xlsx/docx/pdf)
|
|
jsonSummary = self._summarizeJsonStructure(sourceJson)
|
|
summary["jsonStructure"] = jsonSummary
|
|
elif data is not None:
|
|
# Fallback: try to parse documentData as JSON (for non-rendered documents)
|
|
if isinstance(data, dict):
|
|
# Summarize JSON structure
|
|
jsonSummary = self._summarizeJsonStructure(data)
|
|
summary["jsonStructure"] = jsonSummary
|
|
elif isinstance(data, list) and len(data) > 0 and isinstance(data[0], dict):
|
|
# Handle list of documents
|
|
jsonSummary = self._summarizeJsonStructure(data[0])
|
|
summary["jsonStructure"] = jsonSummary
|
|
|
|
summaries.append(summary)
|
|
except Exception as e:
|
|
logger.warning(f"Error analyzing document {getattr(doc, 'documentName', 'Unknown')}: {str(e)}")
|
|
summaries.append({
|
|
"name": getattr(doc, 'documentName', 'Unknown'),
|
|
"mimeType": getattr(doc, 'mimeType', 'unknown'),
|
|
"format": "unknown",
|
|
"size": "0 B",
|
|
"error": str(e)
|
|
})
|
|
|
|
return summaries
|
|
|
|
def _detectFormat(self, doc: Any) -> str:
|
|
"""Extract format from filename extension (always use extension)"""
|
|
try:
|
|
docName = getattr(doc, 'documentName', '')
|
|
|
|
# Extract from filename extension
|
|
if docName and '.' in docName:
|
|
ext = docName.rsplit('.', 1)[1].lower()
|
|
return ext
|
|
|
|
return 'unknown'
|
|
except Exception as e:
|
|
logger.warning(f"Error detecting format: {str(e)}")
|
|
return 'unknown'
|
|
|
|
def _calculateSize(self, doc: Any) -> Dict[str, Any]:
|
|
"""Calculate document size in bytes and human-readable format"""
|
|
try:
|
|
if not hasattr(doc, 'documentData') or doc.documentData is None:
|
|
return {"bytes": 0, "readable": "0 B"}
|
|
|
|
data = doc.documentData
|
|
size_bytes = 0
|
|
|
|
if isinstance(data, str):
|
|
size_bytes = len(data.encode('utf-8'))
|
|
elif isinstance(data, bytes):
|
|
size_bytes = len(data)
|
|
elif isinstance(data, (dict, list)):
|
|
# Estimate JSON size
|
|
try:
|
|
json_str = json.dumps(data)
|
|
size_bytes = len(json_str.encode('utf-8'))
|
|
except:
|
|
size_bytes = len(str(data).encode('utf-8'))
|
|
else:
|
|
size_bytes = len(str(data).encode('utf-8'))
|
|
|
|
# Convert to human-readable format
|
|
readable = self._formatBytes(size_bytes)
|
|
|
|
return {"bytes": size_bytes, "readable": readable}
|
|
except Exception as e:
|
|
logger.warning(f"Error calculating size: {str(e)}")
|
|
return {"bytes": 0, "readable": "0 B"}
|
|
|
|
def _formatBytes(self, bytes: int) -> str:
|
|
"""Format bytes to human-readable string"""
|
|
for unit in ['B', 'KB', 'MB', 'GB']:
|
|
if bytes < 1024.0:
|
|
return f"{bytes:.1f} {unit}"
|
|
bytes /= 1024.0
|
|
return f"{bytes:.1f} TB"
|
|
|
|
|
|
def _isFormatCompatible(self, deliveredFormat: str, expectedFormat: str) -> bool:
|
|
"""
|
|
Generic format compatibility check.
|
|
- txt/md/html are text formats (compatible with each other)
|
|
- pdf/docx/xlsx are document formats (not compatible with each other)
|
|
- json/xml are structured formats
|
|
- images are image formats
|
|
"""
|
|
deliveredLower = deliveredFormat.lower()
|
|
expectedLower = expectedFormat.lower()
|
|
|
|
# Exact match
|
|
if deliveredLower == expectedLower:
|
|
return True
|
|
|
|
# Text formats are interchangeable
|
|
textFormats = ['txt', 'md', 'html', 'text', 'plain']
|
|
if deliveredLower in textFormats and expectedLower in textFormats:
|
|
return True
|
|
|
|
# Structured formats
|
|
if deliveredLower in ['json', 'xml'] and expectedLower in ['json', 'xml']:
|
|
return True
|
|
|
|
# Document formats are NOT compatible with each other
|
|
documentFormats = ['pdf', 'docx', 'xlsx', 'pptx']
|
|
if deliveredLower in documentFormats and expectedLower in documentFormats:
|
|
return False # pdf ≠ docx
|
|
|
|
return False
|
|
|
|
async def _validateWithAI(self, documents: List[Any], intent: Dict[str, Any], taskStep: Optional[Any] = None, actionName: Optional[str] = None, actionParameters: Optional[Dict[str, Any]] = None, actionHistory: Optional[List[Dict[str, Any]]] = None) -> Dict[str, Any]:
|
|
"""AI-based comprehensive validation - generic approach"""
|
|
try:
|
|
if not hasattr(self, 'services') or not self.services or not hasattr(self.services, 'ai'):
|
|
return self._createFailedValidationResult("AI service not available")
|
|
|
|
# Use taskStep.objective if available, otherwise fall back to intent.primaryGoal
|
|
taskObjective = None
|
|
if taskStep and hasattr(taskStep, 'objective'):
|
|
taskObjective = taskStep.objective
|
|
elif taskStep and isinstance(taskStep, dict):
|
|
taskObjective = taskStep.get('objective')
|
|
|
|
# Use taskStep format fields if available, otherwise fall back to intent
|
|
dataType = None
|
|
expectedFormats = None
|
|
if taskStep:
|
|
if hasattr(taskStep, 'dataType') and taskStep.dataType:
|
|
dataType = taskStep.dataType
|
|
elif isinstance(taskStep, dict):
|
|
dataType = taskStep.get('dataType')
|
|
if hasattr(taskStep, 'expectedFormats') and taskStep.expectedFormats:
|
|
expectedFormats = taskStep.expectedFormats
|
|
elif isinstance(taskStep, dict):
|
|
expectedFormats = taskStep.get('expectedFormats')
|
|
|
|
# Fallback to intent if taskStep format fields not available
|
|
if not dataType:
|
|
dataType = intent.get('dataType', 'unknown')
|
|
if not expectedFormats:
|
|
expectedFormats = intent.get('expectedFormats', [])
|
|
|
|
# Determine objective text and label
|
|
objectiveText = taskObjective if taskObjective else intent.get('primaryGoal', 'Unknown')
|
|
objectiveLabel = "TASK OBJECTIVE" if taskObjective else "USER REQUEST"
|
|
|
|
# Build prompt base WITHOUT document summaries first
|
|
# Use success criteria from taskStep if available, otherwise from intent
|
|
successCriteria = []
|
|
if taskStep and hasattr(taskStep, 'successCriteria') and taskStep.successCriteria:
|
|
successCriteria = taskStep.successCriteria
|
|
elif taskStep and isinstance(taskStep, dict):
|
|
successCriteria = taskStep.get('successCriteria', [])
|
|
else:
|
|
successCriteria = intent.get('successCriteria', [])
|
|
criteriaCount = len(successCriteria)
|
|
|
|
# Build action name context with human-readable description
|
|
actionContext = ""
|
|
if actionName:
|
|
# Convert action name to human-readable format
|
|
actionDescription = actionName.replace("ai.", "").replace(".", " ").title()
|
|
actionContext = f"\nDOCUMENTS CREATED BY: {actionDescription} ({actionName})"
|
|
|
|
# Build action parameters context
|
|
actionParamsContext = ""
|
|
if actionParameters and isinstance(actionParameters, dict) and len(actionParameters) > 0:
|
|
# Filter out documentList and other large/redundant parameters for clarity
|
|
relevantParams = {k: v for k, v in actionParameters.items()
|
|
if k not in ['documentList', 'connections'] and v is not None}
|
|
if relevantParams:
|
|
paramsJson = json.dumps(relevantParams, ensure_ascii=False, indent=2)
|
|
actionParamsContext = f"\nACTION PARAMETERS USED: {paramsJson}"
|
|
|
|
# Extract validation metadata from documents (action-specific context)
|
|
validationMetadataContext = ""
|
|
if documents:
|
|
metadataList = []
|
|
for doc in documents:
|
|
metadata = getattr(doc, 'validationMetadata', None)
|
|
if metadata and isinstance(metadata, dict):
|
|
metadataList.append(metadata)
|
|
|
|
if metadataList:
|
|
# Combine all metadata (usually just one document)
|
|
combinedMetadata = {}
|
|
for meta in metadataList:
|
|
combinedMetadata.update(meta)
|
|
|
|
if combinedMetadata:
|
|
metadataJson = json.dumps(combinedMetadata, ensure_ascii=False, indent=2)
|
|
validationMetadataContext = f"\nACTION VALIDATION METADATA: {metadataJson}"
|
|
|
|
# Build action history context (for multi-step workflow validation)
|
|
actionHistoryContext = ""
|
|
if actionHistory and isinstance(actionHistory, list) and len(actionHistory) > 0:
|
|
historyEntries = []
|
|
for entry in actionHistory:
|
|
if isinstance(entry, dict):
|
|
action = entry.get('action', 'unknown')
|
|
params = entry.get('parameters', {}) or {}
|
|
step = entry.get('step', 0)
|
|
# Filter out documentList for clarity
|
|
relevantParams = {k: v for k, v in params.items() if k not in ['documentList', 'connections'] and v is not None}
|
|
paramsStr = json.dumps(relevantParams, ensure_ascii=False) if relevantParams else "{}"
|
|
historyEntries.append(f"Step {step}: {action} {paramsStr}")
|
|
elif isinstance(entry, str):
|
|
historyEntries.append(entry)
|
|
|
|
if historyEntries:
|
|
actionHistoryContext = f"\n\n=== ACTION HISTORY ===\n" + "\n".join(f"- {entry}" for entry in historyEntries)
|
|
actionHistoryContext += "\n\nIMPORTANT: This shows the complete workflow that produced the documents. For process-oriented criteria (e.g., 'internet search performed'), check ACTION HISTORY first. Document metadata may only reflect the LAST action, not the entire workflow."
|
|
|
|
# Format success criteria for display with index numbers
|
|
if successCriteria:
|
|
criteriaDisplay = "\n".join([f"[{i}] {criterion}" for i, criterion in enumerate(successCriteria)])
|
|
else:
|
|
criteriaDisplay = "[]"
|
|
|
|
promptBase = f"""TASK VALIDATION
|
|
|
|
=== TASK INFORMATION ===
|
|
{objectiveLabel}: '{objectiveText}'
|
|
EXPECTED DATA TYPE: {dataType}
|
|
EXPECTED FORMATS: {expectedFormats if expectedFormats else ['any']}{actionContext}{actionParamsContext}{validationMetadataContext}{actionHistoryContext}
|
|
|
|
=== VALIDATION INSTRUCTIONS ===
|
|
|
|
IMPORTANT: Different formats can represent the same data structure. Do not reject a format just because it differs from expected - check the structure summary for actual content.
|
|
|
|
VALIDATION RULES:
|
|
1. Use structure summary (sections, statistics, counts) as PRIMARY evidence for DATA-ORIENTED criteria. Trust structure over format claims.
|
|
2. Use ACTION HISTORY as PRIMARY evidence for PROCESS-ORIENTED criteria (e.g., "internet search performed", "sources cited"). Document metadata may only reflect the last action, not the entire workflow.
|
|
3. For each criterion in criteriaMapping: evaluate ONLY that criterion. Do not mention other criteria.
|
|
4. Priority: Data completeness > Format compatibility. Missing data is more critical than format mismatch.
|
|
5. Format understanding: Different formats can represent equivalent data structures. Focus on content, not format name.
|
|
6. Multi-step workflow awareness: If ACTION HISTORY is present, consider the workflow as a whole. Document metadata (e.g., extraction_method) describes how data was EXTRACTED in the last step, not necessarily how it was OBTAINED in the workflow.
|
|
7. Data availability assessment: If delivered documents do not contain required data, clearly indicate this in findings. Re-reading the same documents might not help.
|
|
|
|
VALIDATION STEPS:
|
|
- Check ACTION HISTORY first (if present) for PROCESS-ORIENTED criteria (e.g., "search performed", "sources used", "verification done")
|
|
- Check ACTION VALIDATION METADATA (if present) - this contains action-specific context for the LAST action only
|
|
- Check structure summary for quantities, counts, statistics (for DATA-ORIENTED criteria)
|
|
- Compare found values with required values from criteria
|
|
- If structure unavailable, use metadata only (format, filename, size)
|
|
- Classify gaps: missing_data (less than required), incomplete_data (partial), wrong_structure (wrong organization), wrong_format (format mismatch but data present)
|
|
- Assess if documents contain the required data: If structure shows documents lack the data, note this in findings - data must be generated or obtained elsewhere, not re-extracted from same documents
|
|
|
|
SCORING:
|
|
- Data complete + structure matches → qualityScore: 0.9-1.0
|
|
- Data complete but format issues → qualityScore: 0.7-0.9
|
|
- Missing/incomplete data → qualityScore: <0.7
|
|
- Format mismatch only (data present) → qualityScore: 0.6-0.7
|
|
|
|
SUGGESTIONS:
|
|
- ONE suggestion per UNMET criterion, ordered by criteriaMapping index
|
|
- Reference actual structure values found and required values
|
|
- Calculate quantitative gaps when numbers are available
|
|
- Be specific and actionable based on structure evidence
|
|
|
|
=== OUTPUT FORMAT ===
|
|
{{
|
|
"overallSuccess": false,
|
|
"qualityScore": 0.0,
|
|
"dataTypeMatch": false,
|
|
"formatMatch": false,
|
|
"documentCount": {len(documents)},
|
|
"criteriaMapping": [
|
|
{{
|
|
"index": 0,
|
|
"criterion": "exact_criterion_text",
|
|
"met": false,
|
|
"reason": "explanation_for_this_criterion_only"
|
|
}}
|
|
],
|
|
"gapAnalysis": "Brief gap summary",
|
|
"gapType": "missing_data" | "wrong_structure" | "wrong_format" | "incomplete_data" | "no_gap",
|
|
"structureComparison": {{
|
|
"required": {{}},
|
|
"found": {{}},
|
|
"gap": {{}}
|
|
}},
|
|
"improvementSuggestions": ["One suggestion per unmet criterion"],
|
|
"validationDetails": [
|
|
{{
|
|
"documentName": "name.ext",
|
|
"issues": ["Specific issue"],
|
|
"suggestions": ["Specific fix"]
|
|
}}
|
|
]
|
|
}}
|
|
|
|
=== DATA ===
|
|
|
|
SUCCESS CRITERIA TO VALIDATE in criteriaMapping array:
|
|
{criteriaDisplay}
|
|
|
|
DELIVERED DOCUMENTS ({len(documents)} items):
|
|
"""
|
|
|
|
# Calculate available space for document summaries
|
|
# Get the model that will be used for validation
|
|
basePromptSize = len(promptBase.encode('utf-8'))
|
|
availableBytes = self._calculateAvailablePromptSpace(basePromptSize)
|
|
|
|
# Analyze documents with size constraints
|
|
documentSummaries = self._analyzeDocumentsWithSizeLimit(documents, availableBytes)
|
|
|
|
# Build final prompt with summaries at the end
|
|
documentsJson = json.dumps(documentSummaries, indent=2, ensure_ascii=False)
|
|
validationPrompt = promptBase + documentsJson
|
|
|
|
# Call AI service for validation
|
|
response = await self.services.ai.callAiPlanning(
|
|
prompt=validationPrompt,
|
|
placeholders=None,
|
|
debugType="contentvalidation"
|
|
)
|
|
|
|
if not response or not response.strip():
|
|
logger.warning("AI validation returned empty response")
|
|
raise ValueError("AI validation failed - empty response")
|
|
|
|
# Clean and extract JSON from response using proper JSON extraction utility
|
|
# This handles nested structures and markdown code blocks correctly
|
|
result = response.strip()
|
|
logger.debug(f"AI validation response length: {len(result)}")
|
|
|
|
# Extract JSON first
|
|
extractedJson = self.services.utils.jsonExtractString(result)
|
|
if not extractedJson:
|
|
logger.debug(f"No JSON found in AI response: {result[:200]}...")
|
|
logger.debug(f"Full AI response: {result}")
|
|
raise ValueError("AI validation failed - no JSON in response")
|
|
|
|
# Proactively fix Python-style booleans (False/True -> false/true) BEFORE parsing
|
|
# This handles booleans in any context: standalone, in lists, in dicts, etc.
|
|
# Use word boundaries but also handle cases where booleans are in brackets/arrays
|
|
# Replace False/True regardless of context (word boundary handles string matching correctly)
|
|
normalizedJson = re.sub(r'\bFalse\b', 'false', extractedJson)
|
|
normalizedJson = re.sub(r'\bTrue\b', 'true', normalizedJson)
|
|
|
|
logger.debug(f"Extracted JSON (before normalization): {extractedJson[:200]}...")
|
|
logger.debug(f"Normalized JSON (after boolean fix): {normalizedJson[:200]}...")
|
|
|
|
# Now try to parse the normalized JSON
|
|
try:
|
|
aiResult = json.loads(normalizedJson)
|
|
logger.info("AI validation JSON parsed successfully")
|
|
except json.JSONDecodeError as json_error:
|
|
logger.warning(f"AI validation invalid JSON after normalization: {str(json_error)}")
|
|
logger.debug(f"JSON content that failed: {normalizedJson[:500]}...")
|
|
raise ValueError(f"AI validation failed - invalid JSON: {str(json_error)}")
|
|
|
|
overall = aiResult.get("overallSuccess")
|
|
quality = aiResult.get("qualityScore")
|
|
details = aiResult.get("validationDetails")
|
|
gap = aiResult.get("gapAnalysis", "")
|
|
improvements = aiResult.get("improvementSuggestions", [])
|
|
gap_type = aiResult.get("gapType", "")
|
|
structure_comp = aiResult.get("structureComparison", {})
|
|
criteria_mapping = aiResult.get("criteriaMapping", [])
|
|
|
|
# Normalize while keeping failures explicit
|
|
normalized = {
|
|
"overallSuccess": overall if isinstance(overall, bool) else None,
|
|
"qualityScore": float(quality) if isinstance(quality, (int, float)) else None,
|
|
"documentCount": len(documentSummaries),
|
|
"gapAnalysis": gap if gap else "",
|
|
"gapType": gap_type if gap_type else "",
|
|
"structureComparison": structure_comp if structure_comp else {},
|
|
"criteriaMapping": criteria_mapping if isinstance(criteria_mapping, list) else [],
|
|
"validationDetails": details if isinstance(details, list) else [{
|
|
"documentName": "AI Validation",
|
|
"gapAnalysis": gap
|
|
}],
|
|
"improvementSuggestions": improvements,
|
|
"schemaCompliant": True,
|
|
"originalType": "json",
|
|
"missingFields": []
|
|
}
|
|
|
|
if normalized["overallSuccess"] is None:
|
|
normalized["missingFields"].append("overallSuccess")
|
|
if normalized["qualityScore"] is None:
|
|
normalized["missingFields"].append("qualityScore")
|
|
if normalized["missingFields"]:
|
|
normalized["schemaCompliant"] = False
|
|
|
|
return normalized
|
|
|
|
except Exception as e:
|
|
logger.error(f"AI validation failed: {str(e)}")
|
|
raise
|
|
|
|
def _createFailedValidationResult(self, errorMessage: str) -> Dict[str, Any]:
|
|
"""Create a standardized failed validation result"""
|
|
return {
|
|
"overallSuccess": False,
|
|
"qualityScore": 0.0,
|
|
"dataTypeMatch": False,
|
|
"formatMatch": False,
|
|
"documentCount": 0,
|
|
"criteriaMapping": [],
|
|
"gapAnalysis": errorMessage,
|
|
"improvementSuggestions": [],
|
|
"validationDetails": [],
|
|
"schemaCompliant": True,
|
|
"originalType": "error",
|
|
"missingFields": [],
|
|
"error": errorMessage
|
|
}
|