613 lines
30 KiB
Python
613 lines
30 KiB
Python
# contentValidator.py
|
|
# Content validation for adaptive Dynamic mode
|
|
# Generic, document-aware validation system
|
|
|
|
import logging
|
|
import json
|
|
import base64
|
|
import re
|
|
from typing import List, Dict, Any, Optional
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Configuration constants
|
|
MAX_CONTENT_SIZE_FOR_FULL_PREVIEW = 50 * 1024 # 50KB threshold
|
|
PREVIEW_SAMPLE_SIZE = 1024 # 1KB preview for large documents
|
|
|
|
|
|
class ContentValidator:
|
|
"""Validates delivered content against user intent - generic and document-aware"""
|
|
|
|
def __init__(self, services=None, learningEngine=None):
|
|
self.services = services
|
|
self.learningEngine = learningEngine
|
|
|
|
async def validateContent(self, documents: List[Any], intent: Dict[str, Any], taskStep: Optional[Any] = None, actionName: Optional[str] = None) -> Dict[str, Any]:
|
|
"""Validates delivered content against user intent using AI (single attempt; parse-or-fail)
|
|
|
|
Args:
|
|
documents: List of documents to validate
|
|
intent: Workflow-level intent dict (for format requirements)
|
|
taskStep: Optional TaskStep object (preferred source for objective)
|
|
actionName: Optional action name (e.g., "ai.process", "ai.webResearch") that created the documents
|
|
"""
|
|
return await self._validateWithAI(documents, intent, taskStep, actionName)
|
|
|
|
def _analyzeDocuments(self, documents: List[Any]) -> List[Dict[str, Any]]:
|
|
"""Generic document analysis - create simple summaries with metadata."""
|
|
summaries = []
|
|
for doc in documents:
|
|
try:
|
|
data = getattr(doc, 'documentData', None)
|
|
name = getattr(doc, 'documentName', 'Unknown')
|
|
mimeType = getattr(doc, 'mimeType', 'unknown')
|
|
formatExt = self._detectFormat(doc)
|
|
sizeInfo = self._calculateSize(doc)
|
|
|
|
# Simple preview: if it's dict/list, dump JSON; otherwise use string
|
|
preview = None
|
|
if data is not None:
|
|
if isinstance(data, (dict, list)):
|
|
preview = json.dumps(data, indent=2, ensure_ascii=False)
|
|
# Truncate if too large
|
|
if len(preview) > MAX_CONTENT_SIZE_FOR_FULL_PREVIEW:
|
|
preview = preview[:PREVIEW_SAMPLE_SIZE] + f"\n\n[Truncated - {self._formatBytes(sizeInfo['bytes'])} total]"
|
|
else:
|
|
text = str(data)
|
|
if len(text) > MAX_CONTENT_SIZE_FOR_FULL_PREVIEW:
|
|
preview = text[:PREVIEW_SAMPLE_SIZE] + f"\n\n[Truncated - {self._formatBytes(sizeInfo['bytes'])} total]"
|
|
else:
|
|
preview = text
|
|
|
|
summary = {
|
|
"name": name,
|
|
"mimeType": mimeType,
|
|
"format": formatExt,
|
|
"size": sizeInfo["readable"],
|
|
"preview": preview
|
|
}
|
|
summaries.append(summary)
|
|
except Exception as e:
|
|
logger.warning(f"Error analyzing document {getattr(doc, 'documentName', 'Unknown')}: {str(e)}")
|
|
summaries.append({
|
|
"name": getattr(doc, 'documentName', 'Unknown'),
|
|
"mimeType": getattr(doc, 'mimeType', 'unknown'),
|
|
"format": "unknown",
|
|
"size": "0 B",
|
|
"preview": None,
|
|
"error": str(e)
|
|
})
|
|
return summaries
|
|
|
|
def _calculateAvailablePromptSpace(self, basePromptSizeBytes: int) -> int:
|
|
"""Calculate available space for document summaries based on model context length."""
|
|
try:
|
|
from modules.aicore.aicoreModelRegistry import modelRegistry
|
|
from modules.aicore.aicoreModelSelector import modelSelector
|
|
from modules.datamodels.datamodelAi import AiCallOptions, OperationTypeEnum
|
|
|
|
# Get available models
|
|
availableModels = modelRegistry.getAvailableModels()
|
|
|
|
# Create options for PLAN operation (what validation uses)
|
|
# Use default values for priority and processingMode (will use defaults from AiCallOptions)
|
|
from modules.datamodels.datamodelAi import PriorityEnum, ProcessingModeEnum
|
|
options = AiCallOptions(
|
|
operationType=OperationTypeEnum.PLAN,
|
|
priority=PriorityEnum.BALANCED,
|
|
processingMode=ProcessingModeEnum.BASIC
|
|
)
|
|
|
|
# Get failover model list to find the model that will be used
|
|
failoverModels = modelSelector.getFailoverModelList("", "", options, availableModels)
|
|
|
|
if not failoverModels:
|
|
# Fallback: assume 16K tokens context (conservative)
|
|
logger.warning("No models available for space calculation, using fallback: 16K tokens")
|
|
maxBytes = 16 * 1024 * 4 # 16K tokens * 4 bytes per token
|
|
else:
|
|
# Use the first (best) model
|
|
model = failoverModels[0]
|
|
# Calculate 80% of context length in bytes (tokens * 4 bytes per token)
|
|
maxBytes = int(model.contextLength * 0.8 * 4)
|
|
|
|
# Available space = max - base prompt - safety margin (10%)
|
|
availableBytes = int((maxBytes - basePromptSizeBytes) * 0.9)
|
|
|
|
# Ensure minimum available space (at least 1KB)
|
|
availableBytes = max(availableBytes, 1024)
|
|
|
|
logger.debug(f"Prompt space calculation: base={basePromptSizeBytes} bytes, max={maxBytes} bytes, available={availableBytes} bytes")
|
|
|
|
return availableBytes
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Error calculating available prompt space: {str(e)}, using fallback: 8KB")
|
|
# Fallback: assume 8KB available
|
|
return 8 * 1024
|
|
|
|
def _summarizeJsonStructure(self, jsonData: Any) -> Dict[str, Any]:
|
|
"""Summarize JSON document structure for validation - extracts main objects, statistics, captions, and IDs."""
|
|
try:
|
|
if not isinstance(jsonData, dict):
|
|
return {"type": "non-dict", "preview": str(jsonData)[:200]}
|
|
|
|
summary = {
|
|
"metadata": {},
|
|
"sections": [],
|
|
"statistics": {}
|
|
}
|
|
|
|
# Extract metadata
|
|
metadata = jsonData.get("metadata", {})
|
|
if metadata:
|
|
summary["metadata"] = {
|
|
"title": metadata.get("title"),
|
|
"split_strategy": metadata.get("split_strategy"),
|
|
"extraction_method": metadata.get("extraction_method")
|
|
}
|
|
|
|
# Extract documents array (if present)
|
|
documents = jsonData.get("documents", [])
|
|
if documents:
|
|
summary["statistics"]["documentCount"] = len(documents)
|
|
# Process first document (most common case)
|
|
if len(documents) > 0:
|
|
doc = documents[0]
|
|
docSections = doc.get("sections", [])
|
|
summary["statistics"]["sectionCount"] = len(docSections)
|
|
|
|
# Summarize sections
|
|
for section in docSections:
|
|
sectionSummary = {
|
|
"id": section.get("id"),
|
|
"content_type": section.get("content_type"),
|
|
"title": section.get("title"),
|
|
"order": section.get("order")
|
|
}
|
|
|
|
# For tables: extract caption and statistics
|
|
if section.get("content_type") == "table":
|
|
elements = section.get("elements", [])
|
|
if elements and isinstance(elements, list) and len(elements) > 0:
|
|
tableElement = elements[0]
|
|
sectionSummary["caption"] = tableElement.get("caption")
|
|
headers = tableElement.get("headers", [])
|
|
rows = tableElement.get("rows", [])
|
|
sectionSummary["columnCount"] = len(headers)
|
|
sectionSummary["rowCount"] = len(rows)
|
|
sectionSummary["headers"] = headers # Include headers for context
|
|
|
|
# For lists: extract item count
|
|
elif section.get("content_type") == "list":
|
|
elements = section.get("elements", [])
|
|
if elements and isinstance(elements, list) and len(elements) > 0:
|
|
listElement = elements[0]
|
|
items = listElement.get("items", [])
|
|
sectionSummary["itemCount"] = len(items)
|
|
|
|
# For paragraphs/headings: extract text preview
|
|
elif section.get("content_type") in ["paragraph", "heading"]:
|
|
elements = section.get("elements", [])
|
|
if elements and isinstance(elements, list) and len(elements) > 0:
|
|
textElement = elements[0]
|
|
text = textElement.get("text", "")
|
|
if text:
|
|
sectionSummary["textPreview"] = text[:100] + ("..." if len(text) > 100 else "")
|
|
|
|
summary["sections"].append(sectionSummary)
|
|
else:
|
|
# Fallback: check for sections directly in root
|
|
sections = jsonData.get("sections", [])
|
|
if sections:
|
|
summary["statistics"]["sectionCount"] = len(sections)
|
|
for section in sections:
|
|
sectionSummary = {
|
|
"id": section.get("id"),
|
|
"content_type": section.get("content_type"),
|
|
"title": section.get("title")
|
|
}
|
|
|
|
if section.get("content_type") == "table":
|
|
elements = section.get("elements", [])
|
|
if elements and isinstance(elements, list) and len(elements) > 0:
|
|
tableElement = elements[0]
|
|
sectionSummary["caption"] = tableElement.get("caption")
|
|
headers = tableElement.get("headers", [])
|
|
rows = tableElement.get("rows", [])
|
|
sectionSummary["columnCount"] = len(headers)
|
|
sectionSummary["rowCount"] = len(rows)
|
|
sectionSummary["headers"] = headers
|
|
|
|
summary["sections"].append(sectionSummary)
|
|
|
|
return summary
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Error summarizing JSON structure: {str(e)}")
|
|
return {"error": str(e), "type": "error"}
|
|
|
|
def _analyzeDocumentsWithSizeLimit(self, documents: List[Any], maxTotalBytes: int) -> List[Dict[str, Any]]:
|
|
"""
|
|
Analyze documents for validation - includes metadata AND JSON structure summary.
|
|
JSON summary provides structure information (sections, tables with captions, IDs) without full content.
|
|
"""
|
|
if not documents:
|
|
return []
|
|
|
|
summaries = []
|
|
for doc in documents:
|
|
try:
|
|
name = getattr(doc, 'documentName', 'Unknown')
|
|
mimeType = getattr(doc, 'mimeType', 'unknown')
|
|
formatExt = self._detectFormat(doc)
|
|
sizeInfo = self._calculateSize(doc)
|
|
|
|
summary = {
|
|
"name": name,
|
|
"mimeType": mimeType,
|
|
"format": formatExt,
|
|
"size": sizeInfo["readable"]
|
|
}
|
|
|
|
# Extract JSON structure summary if documentData is available
|
|
data = getattr(doc, 'documentData', None)
|
|
if data is not None:
|
|
if isinstance(data, dict):
|
|
# Summarize JSON structure
|
|
jsonSummary = self._summarizeJsonStructure(data)
|
|
summary["jsonStructure"] = jsonSummary
|
|
elif isinstance(data, list) and len(data) > 0 and isinstance(data[0], dict):
|
|
# Handle list of documents
|
|
jsonSummary = self._summarizeJsonStructure(data[0])
|
|
summary["jsonStructure"] = jsonSummary
|
|
|
|
summaries.append(summary)
|
|
except Exception as e:
|
|
logger.warning(f"Error analyzing document {getattr(doc, 'documentName', 'Unknown')}: {str(e)}")
|
|
summaries.append({
|
|
"name": getattr(doc, 'documentName', 'Unknown'),
|
|
"mimeType": getattr(doc, 'mimeType', 'unknown'),
|
|
"format": "unknown",
|
|
"size": "0 B",
|
|
"error": str(e)
|
|
})
|
|
|
|
return summaries
|
|
|
|
def _detectFormat(self, doc: Any) -> str:
|
|
"""Extract format from filename extension (always use extension)"""
|
|
try:
|
|
docName = getattr(doc, 'documentName', '')
|
|
|
|
# Extract from filename extension
|
|
if docName and '.' in docName:
|
|
ext = docName.rsplit('.', 1)[1].lower()
|
|
return ext
|
|
|
|
return 'unknown'
|
|
except Exception as e:
|
|
logger.warning(f"Error detecting format: {str(e)}")
|
|
return 'unknown'
|
|
|
|
def _calculateSize(self, doc: Any) -> Dict[str, Any]:
|
|
"""Calculate document size in bytes and human-readable format"""
|
|
try:
|
|
if not hasattr(doc, 'documentData') or doc.documentData is None:
|
|
return {"bytes": 0, "readable": "0 B"}
|
|
|
|
data = doc.documentData
|
|
size_bytes = 0
|
|
|
|
if isinstance(data, str):
|
|
size_bytes = len(data.encode('utf-8'))
|
|
elif isinstance(data, bytes):
|
|
size_bytes = len(data)
|
|
elif isinstance(data, (dict, list)):
|
|
# Estimate JSON size
|
|
try:
|
|
json_str = json.dumps(data)
|
|
size_bytes = len(json_str.encode('utf-8'))
|
|
except:
|
|
size_bytes = len(str(data).encode('utf-8'))
|
|
else:
|
|
size_bytes = len(str(data).encode('utf-8'))
|
|
|
|
# Convert to human-readable format
|
|
readable = self._formatBytes(size_bytes)
|
|
|
|
return {"bytes": size_bytes, "readable": readable}
|
|
except Exception as e:
|
|
logger.warning(f"Error calculating size: {str(e)}")
|
|
return {"bytes": 0, "readable": "0 B"}
|
|
|
|
def _formatBytes(self, bytes: int) -> str:
|
|
"""Format bytes to human-readable string"""
|
|
for unit in ['B', 'KB', 'MB', 'GB']:
|
|
if bytes < 1024.0:
|
|
return f"{bytes:.1f} {unit}"
|
|
bytes /= 1024.0
|
|
return f"{bytes:.1f} TB"
|
|
|
|
|
|
def _isFormatCompatible(self, deliveredFormat: str, expectedFormat: str) -> bool:
|
|
"""
|
|
Generic format compatibility check.
|
|
- txt/md/html are text formats (compatible with each other)
|
|
- pdf/docx/xlsx are document formats (not compatible with each other)
|
|
- json/xml are structured formats
|
|
- images are image formats
|
|
"""
|
|
deliveredLower = deliveredFormat.lower()
|
|
expectedLower = expectedFormat.lower()
|
|
|
|
# Exact match
|
|
if deliveredLower == expectedLower:
|
|
return True
|
|
|
|
# Text formats are interchangeable
|
|
textFormats = ['txt', 'md', 'html', 'text', 'plain']
|
|
if deliveredLower in textFormats and expectedLower in textFormats:
|
|
return True
|
|
|
|
# Structured formats
|
|
if deliveredLower in ['json', 'xml'] and expectedLower in ['json', 'xml']:
|
|
return True
|
|
|
|
# Document formats are NOT compatible with each other
|
|
documentFormats = ['pdf', 'docx', 'xlsx', 'pptx']
|
|
if deliveredLower in documentFormats and expectedLower in documentFormats:
|
|
return False # pdf ≠ docx
|
|
|
|
return False
|
|
|
|
async def _validateWithAI(self, documents: List[Any], intent: Dict[str, Any], taskStep: Optional[Any] = None, actionName: Optional[str] = None) -> Dict[str, Any]:
|
|
"""AI-based comprehensive validation - generic approach"""
|
|
try:
|
|
if not hasattr(self, 'services') or not self.services or not hasattr(self.services, 'ai'):
|
|
return self._createFailedValidationResult("AI service not available")
|
|
|
|
# Use taskStep.objective if available, otherwise fall back to intent.primaryGoal
|
|
taskObjective = None
|
|
if taskStep and hasattr(taskStep, 'objective'):
|
|
taskObjective = taskStep.objective
|
|
elif taskStep and isinstance(taskStep, dict):
|
|
taskObjective = taskStep.get('objective')
|
|
|
|
# Use taskStep format fields if available, otherwise fall back to intent
|
|
dataType = None
|
|
expectedFormats = None
|
|
if taskStep:
|
|
if hasattr(taskStep, 'dataType') and taskStep.dataType:
|
|
dataType = taskStep.dataType
|
|
elif isinstance(taskStep, dict):
|
|
dataType = taskStep.get('dataType')
|
|
if hasattr(taskStep, 'expectedFormats') and taskStep.expectedFormats:
|
|
expectedFormats = taskStep.expectedFormats
|
|
elif isinstance(taskStep, dict):
|
|
expectedFormats = taskStep.get('expectedFormats')
|
|
|
|
# Fallback to intent if taskStep format fields not available
|
|
if not dataType:
|
|
dataType = intent.get('dataType', 'unknown')
|
|
if not expectedFormats:
|
|
expectedFormats = intent.get('expectedFormats', [])
|
|
|
|
# Determine objective text and label
|
|
objectiveText = taskObjective if taskObjective else intent.get('primaryGoal', 'Unknown')
|
|
objectiveLabel = "TASK OBJECTIVE" if taskObjective else "USER REQUEST"
|
|
|
|
# Build prompt base WITHOUT document summaries first
|
|
# Use success criteria from taskStep if available, otherwise from intent
|
|
successCriteria = []
|
|
if taskStep and hasattr(taskStep, 'successCriteria') and taskStep.successCriteria:
|
|
successCriteria = taskStep.successCriteria
|
|
elif taskStep and isinstance(taskStep, dict):
|
|
successCriteria = taskStep.get('successCriteria', [])
|
|
else:
|
|
successCriteria = intent.get('successCriteria', [])
|
|
criteriaCount = len(successCriteria)
|
|
|
|
# Build action name context with human-readable description
|
|
actionContext = ""
|
|
if actionName:
|
|
# Convert action name to human-readable format
|
|
actionDescription = actionName.replace("ai.", "").replace(".", " ").title()
|
|
if "convert" in actionName.lower():
|
|
actionDescription = "Document format conversion"
|
|
elif "generate" in actionName.lower() or "create" in actionName.lower():
|
|
actionDescription = "Document generation"
|
|
elif "extract" in actionName.lower():
|
|
actionDescription = "Content extraction"
|
|
elif "process" in actionName.lower():
|
|
actionDescription = "Content processing"
|
|
actionContext = f"\nDOCUMENTS CREATED BY: {actionDescription} ({actionName})"
|
|
|
|
# Format success criteria for display
|
|
criteriaDisplay = json.dumps(successCriteria, ensure_ascii=False) if successCriteria else "[]"
|
|
|
|
# Build successCriteriaMet example - show proper array format
|
|
criteriaMetExample = json.dumps([False] * criteriaCount) if criteriaCount > 0 else "[]"
|
|
|
|
promptBase = f"""TASK VALIDATION
|
|
|
|
{objectiveLabel}: '{objectiveText}'
|
|
EXPECTED DATA TYPE: {dataType}
|
|
EXPECTED FORMATS: {expectedFormats if expectedFormats else ['any']}
|
|
SUCCESS CRITERIA ({criteriaCount} items): {criteriaDisplay}{actionContext}
|
|
|
|
VALIDATION RULES:
|
|
You have document METADATA (filename, format, size, mimeType) AND JSON STRUCTURE SUMMARY (sections, tables with captions, IDs, statistics).
|
|
|
|
What CAN be validated:
|
|
- Format compatibility: Check if delivered format matches expected format (e.g., xlsx matches xlsx, docx matches docx)
|
|
- Filename appropriateness: Check if filename suggests correct content type (e.g., "employee_data.xlsx" suggests employee data)
|
|
- Document structure: Use JSON structure summary to validate:
|
|
* Number of sections/tables matches requirements
|
|
* Table captions are present and meaningful (if task requires specific tables)
|
|
* Section IDs are present (if needed)
|
|
* Table row/column counts are reasonable for the task
|
|
* Section types match expectations (e.g., task asks for tables, check if tables are present)
|
|
- Document count: Check if number of documents matches expectations
|
|
- Basic size sanity: Only flag size if EXTREMELY small (<1KB) or suspiciously large for the task type
|
|
|
|
What CANNOT be validated:
|
|
- Content quality, accuracy, or completeness of actual data values
|
|
- Whether specific data values are correct
|
|
- Whether formatting details are perfect
|
|
- Whether content meets very detailed requirements that require reading actual data
|
|
|
|
Validation approach:
|
|
1. Format matching is PRIMARY - if format matches, qualityScore should be at least 0.7
|
|
2. Structure validation using JSON summary is SECONDARY - check if structure matches requirements:
|
|
- If task asks for "two sheets" or "two tables", verify section count or table count from JSON summary
|
|
- If task asks for specific table captions, verify they exist in JSON summary
|
|
- If task asks for specific structure (e.g., "Employees table" and "Departments table"), verify section titles/captions match
|
|
3. Filename appropriateness is TERTIARY - meaningful filenames increase score
|
|
4. Size checks should be VERY conservative - only flag if clearly wrong (e.g., 0 bytes or <1KB for complex documents)
|
|
5. For successCriteriaMet: Evaluate each criterion using metadata AND JSON structure:
|
|
- Format-related criteria: Can be evaluated (e.g., "Excel file" → check format)
|
|
- Structure-related criteria: Can be evaluated using JSON summary (e.g., "two sheets" → check section count, "table with caption X" → check JSON summary for caption)
|
|
- Content-related criteria: Set to false if cannot be determined from structure (don't guess data values)
|
|
6. Only suggest improvements if there are CLEAR issues (wrong format, missing structure elements, etc.)
|
|
7. If format matches, structure matches requirements (from JSON summary), and filename is reasonable, qualityScore should be 0.8-1.0
|
|
|
|
OUTPUT FORMAT - JSON ONLY (no prose):
|
|
{{
|
|
"overallSuccess": false,
|
|
"qualityScore": 0.0,
|
|
"dataTypeMatch": false,
|
|
"formatMatch": false,
|
|
"documentCount": {len(documents)},
|
|
"successCriteriaMet": {criteriaMetExample},
|
|
"gapAnalysis": "Describe what is missing or incorrect based ONLY on metadata (format, filename, count, size). If format matches and filename is reasonable, state that validation is limited by metadata-only access.",
|
|
"improvementSuggestions": [],
|
|
"validationDetails": [
|
|
{{
|
|
"documentName": "document.ext",
|
|
"issues": ["Issue inferred from metadata ONLY"],
|
|
"suggestions": ["Specific fix based on metadata analysis"]
|
|
}}
|
|
]
|
|
}}
|
|
|
|
Field explanations:
|
|
- "successCriteriaMet": Array of {criteriaCount} boolean values, one per success criterion. Evaluate each based ONLY on metadata. If a criterion cannot be evaluated from metadata, set to false and explain in gapAnalysis.
|
|
- "qualityScore": 0.0-1.0 score. If format matches and filename is reasonable, score should be 0.8-1.0. Only reduce score for clear metadata issues.
|
|
- "overallSuccess": true if format matches AND (qualityScore >= 0.8 OR no clear metadata issues)
|
|
- "improvementSuggestions": Only include if there are CLEAR metadata issues that can be fixed. If format matches and filename is reasonable, leave empty array [].
|
|
- "gapAnalysis": Be honest about limitations - if validation is limited by metadata-only access, state this clearly.
|
|
- IMPORTANT: Do NOT suggest improvements based on assumptions about content quality. Only suggest fixes for clear metadata problems (wrong format, missing documents, etc.).
|
|
|
|
DELIVERED DOCUMENTS ({len(documents)} items):
|
|
"""
|
|
|
|
# Calculate available space for document summaries
|
|
# Get the model that will be used for validation
|
|
basePromptSize = len(promptBase.encode('utf-8'))
|
|
availableBytes = self._calculateAvailablePromptSpace(basePromptSize)
|
|
|
|
# Analyze documents with size constraints
|
|
documentSummaries = self._analyzeDocumentsWithSizeLimit(documents, availableBytes)
|
|
|
|
# Build final prompt with summaries at the end
|
|
# Format document summaries with JSON structure prominently displayed
|
|
documentsJson = json.dumps(documentSummaries, indent=2, ensure_ascii=False)
|
|
validationPrompt = promptBase + documentsJson + "\n\nNOTE: The 'jsonStructure' field in each document summary contains the document structure (sections, tables with captions, IDs, statistics). Use this to validate structure requirements like number of tables, table captions, section types, etc."
|
|
|
|
# Call AI service for validation
|
|
response = await self.services.ai.callAiPlanning(
|
|
prompt=validationPrompt,
|
|
placeholders=None,
|
|
debugType="contentvalidation"
|
|
)
|
|
|
|
if not response or not response.strip():
|
|
logger.warning("AI validation returned empty response")
|
|
raise ValueError("AI validation failed - empty response")
|
|
|
|
# Clean and extract JSON from response using proper JSON extraction utility
|
|
# This handles nested structures and markdown code blocks correctly
|
|
result = response.strip()
|
|
logger.debug(f"AI validation response length: {len(result)}")
|
|
|
|
# Extract JSON first
|
|
extractedJson = self.services.utils.jsonExtractString(result)
|
|
if not extractedJson:
|
|
logger.debug(f"No JSON found in AI response: {result[:200]}...")
|
|
logger.debug(f"Full AI response: {result}")
|
|
raise ValueError("AI validation failed - no JSON in response")
|
|
|
|
# Proactively fix Python-style booleans (False/True -> false/true) BEFORE parsing
|
|
# This handles booleans in any context: standalone, in lists, in dicts, etc.
|
|
import re
|
|
# Use word boundaries but also handle cases where booleans are in brackets/arrays
|
|
# Replace False/True regardless of context (word boundary handles string matching correctly)
|
|
normalizedJson = re.sub(r'\bFalse\b', 'false', extractedJson)
|
|
normalizedJson = re.sub(r'\bTrue\b', 'true', normalizedJson)
|
|
|
|
logger.debug(f"Extracted JSON (before normalization): {extractedJson[:200]}...")
|
|
logger.debug(f"Normalized JSON (after boolean fix): {normalizedJson[:200]}...")
|
|
|
|
# Now try to parse the normalized JSON
|
|
try:
|
|
aiResult = json.loads(normalizedJson)
|
|
logger.info("AI validation JSON parsed successfully")
|
|
except json.JSONDecodeError as json_error:
|
|
logger.warning(f"AI validation invalid JSON after normalization: {str(json_error)}")
|
|
logger.debug(f"JSON content that failed: {normalizedJson[:500]}...")
|
|
raise ValueError(f"AI validation failed - invalid JSON: {str(json_error)}")
|
|
|
|
overall = aiResult.get("overallSuccess")
|
|
quality = aiResult.get("qualityScore")
|
|
details = aiResult.get("validationDetails")
|
|
gap = aiResult.get("gapAnalysis", "")
|
|
criteria = aiResult.get("successCriteriaMet")
|
|
improvements = aiResult.get("improvementSuggestions", [])
|
|
|
|
# Normalize while keeping failures explicit
|
|
normalized = {
|
|
"overallSuccess": overall if isinstance(overall, bool) else None,
|
|
"qualityScore": float(quality) if isinstance(quality, (int, float)) else None,
|
|
"documentCount": len(documentSummaries),
|
|
"validationDetails": details if isinstance(details, list) else [{
|
|
"documentName": "AI Validation",
|
|
"gapAnalysis": gap,
|
|
"successCriteriaMet": criteria if isinstance(criteria, list) else []
|
|
}],
|
|
"improvementSuggestions": improvements,
|
|
"schemaCompliant": True,
|
|
"originalType": "json",
|
|
"missingFields": []
|
|
}
|
|
|
|
if normalized["overallSuccess"] is None:
|
|
normalized["missingFields"].append("overallSuccess")
|
|
if normalized["qualityScore"] is None:
|
|
normalized["missingFields"].append("qualityScore")
|
|
if normalized["missingFields"]:
|
|
normalized["schemaCompliant"] = False
|
|
|
|
return normalized
|
|
|
|
except Exception as e:
|
|
logger.error(f"AI validation failed: {str(e)}")
|
|
raise
|
|
|
|
def _createFailedValidationResult(self, errorMessage: str) -> Dict[str, Any]:
|
|
"""Create a standardized failed validation result"""
|
|
return {
|
|
"overallSuccess": False,
|
|
"qualityScore": 0.0,
|
|
"dataTypeMatch": False,
|
|
"formatMatch": False,
|
|
"documentCount": 0,
|
|
"successCriteriaMet": [],
|
|
"gapAnalysis": errorMessage,
|
|
"improvementSuggestions": [],
|
|
"validationDetails": [],
|
|
"schemaCompliant": True,
|
|
"originalType": "error",
|
|
"missingFields": [],
|
|
"error": errorMessage
|
|
}
|