gateway/modules/workflows/processing/adaptive/contentValidator.py

452 lines
20 KiB
Python

# contentValidator.py
# Content validation for adaptive React mode
# Generic, document-aware validation system
import logging
import json
import base64
import re
from typing import List, Dict, Any, Optional
logger = logging.getLogger(__name__)
# Configuration constants
MAX_CONTENT_SIZE_FOR_FULL_PREVIEW = 50 * 1024 # 50KB threshold
PREVIEW_SAMPLE_SIZE = 1024 # 1KB preview for large documents
class ContentValidator:
"""Validates delivered content against user intent - generic and document-aware"""
def __init__(self, services=None, learningEngine=None):
self.services = services
self.learningEngine = learningEngine
async def validateContent(self, documents: List[Any], intent: Dict[str, Any], taskStep: Optional[Any] = None) -> Dict[str, Any]:
"""Validates delivered content against user intent using AI (single attempt; parse-or-fail)
Args:
documents: List of documents to validate
intent: Workflow-level intent dict (for format requirements)
taskStep: Optional TaskStep object (preferred source for objective)
"""
return await self._validateWithAI(documents, intent, taskStep)
def _analyzeDocuments(self, documents: List[Any]) -> List[Dict[str, Any]]:
"""Generic document analysis - create simple summaries with metadata."""
summaries = []
for doc in documents:
try:
data = getattr(doc, 'documentData', None)
name = getattr(doc, 'documentName', 'Unknown')
mimeType = getattr(doc, 'mimeType', 'unknown')
formatExt = self._detectFormat(doc)
sizeInfo = self._calculateSize(doc)
# Simple preview: if it's dict/list, dump JSON; otherwise use string
preview = None
if data is not None:
if isinstance(data, (dict, list)):
preview = json.dumps(data, indent=2, ensure_ascii=False)
# Truncate if too large
if len(preview) > MAX_CONTENT_SIZE_FOR_FULL_PREVIEW:
preview = preview[:PREVIEW_SAMPLE_SIZE] + f"\n\n[Truncated - {self._formatBytes(sizeInfo['bytes'])} total]"
else:
text = str(data)
if len(text) > MAX_CONTENT_SIZE_FOR_FULL_PREVIEW:
preview = text[:PREVIEW_SAMPLE_SIZE] + f"\n\n[Truncated - {self._formatBytes(sizeInfo['bytes'])} total]"
else:
preview = text
summary = {
"name": name,
"mimeType": mimeType,
"format": formatExt,
"size": sizeInfo["readable"],
"preview": preview
}
summaries.append(summary)
except Exception as e:
logger.warning(f"Error analyzing document {getattr(doc, 'documentName', 'Unknown')}: {str(e)}")
summaries.append({
"name": getattr(doc, 'documentName', 'Unknown'),
"mimeType": getattr(doc, 'mimeType', 'unknown'),
"format": "unknown",
"size": "0 B",
"preview": None,
"error": str(e)
})
return summaries
def _calculateAvailablePromptSpace(self, basePromptSizeBytes: int) -> int:
"""Calculate available space for document summaries based on model context length."""
try:
from modules.aicore.aicoreModelRegistry import modelRegistry
from modules.aicore.aicoreModelSelector import modelSelector
from modules.datamodels.datamodelAi import AiCallOptions, OperationTypeEnum
# Get available models
availableModels = modelRegistry.getAvailableModels()
# Create options for PLAN operation (what validation uses)
# Use default values for priority and processingMode (will use defaults from AiCallOptions)
from modules.datamodels.datamodelAi import PriorityEnum, ProcessingModeEnum
options = AiCallOptions(
operationType=OperationTypeEnum.PLAN,
priority=PriorityEnum.BALANCED,
processingMode=ProcessingModeEnum.BASIC
)
# Get failover model list to find the model that will be used
failoverModels = modelSelector.getFailoverModelList("", "", options, availableModels)
if not failoverModels:
# Fallback: assume 16K tokens context (conservative)
logger.warning("No models available for space calculation, using fallback: 16K tokens")
maxBytes = 16 * 1024 * 4 # 16K tokens * 4 bytes per token
else:
# Use the first (best) model
model = failoverModels[0]
# Calculate 80% of context length in bytes (tokens * 4 bytes per token)
maxBytes = int(model.contextLength * 0.8 * 4)
# Available space = max - base prompt - safety margin (10%)
availableBytes = int((maxBytes - basePromptSizeBytes) * 0.9)
# Ensure minimum available space (at least 1KB)
availableBytes = max(availableBytes, 1024)
logger.debug(f"Prompt space calculation: base={basePromptSizeBytes} bytes, max={maxBytes} bytes, available={availableBytes} bytes")
return availableBytes
except Exception as e:
logger.warning(f"Error calculating available prompt space: {str(e)}, using fallback: 8KB")
# Fallback: assume 8KB available
return 8 * 1024
def _analyzeDocumentsWithSizeLimit(self, documents: List[Any], maxTotalBytes: int) -> List[Dict[str, Any]]:
"""
Analyze documents for validation - METADATA ONLY (no document content/previews).
For planning/validation, we only need metadata to assess format, type, and size compatibility.
"""
if not documents:
return []
summaries = []
for doc in documents:
try:
name = getattr(doc, 'documentName', 'Unknown')
mimeType = getattr(doc, 'mimeType', 'unknown')
formatExt = self._detectFormat(doc)
sizeInfo = self._calculateSize(doc)
# Only include metadata - NO document content/previews
# This keeps prompts small and focused on validation criteria
summary = {
"name": name,
"mimeType": mimeType,
"format": formatExt,
"size": sizeInfo["readable"]
}
summaries.append(summary)
except Exception as e:
logger.warning(f"Error analyzing document {getattr(doc, 'documentName', 'Unknown')}: {str(e)}")
summaries.append({
"name": getattr(doc, 'documentName', 'Unknown'),
"mimeType": getattr(doc, 'mimeType', 'unknown'),
"format": "unknown",
"size": "0 B",
"error": str(e)
})
return summaries
def _detectFormat(self, doc: Any) -> str:
"""Extract format from filename extension (always use extension)"""
try:
docName = getattr(doc, 'documentName', '')
# Extract from filename extension
if docName and '.' in docName:
ext = docName.rsplit('.', 1)[1].lower()
return ext
return 'unknown'
except Exception as e:
logger.warning(f"Error detecting format: {str(e)}")
return 'unknown'
def _calculateSize(self, doc: Any) -> Dict[str, Any]:
"""Calculate document size in bytes and human-readable format"""
try:
if not hasattr(doc, 'documentData') or doc.documentData is None:
return {"bytes": 0, "readable": "0 B"}
data = doc.documentData
size_bytes = 0
if isinstance(data, str):
size_bytes = len(data.encode('utf-8'))
elif isinstance(data, bytes):
size_bytes = len(data)
elif isinstance(data, (dict, list)):
# Estimate JSON size
try:
json_str = json.dumps(data)
size_bytes = len(json_str.encode('utf-8'))
except:
size_bytes = len(str(data).encode('utf-8'))
else:
size_bytes = len(str(data).encode('utf-8'))
# Convert to human-readable format
readable = self._formatBytes(size_bytes)
return {"bytes": size_bytes, "readable": readable}
except Exception as e:
logger.warning(f"Error calculating size: {str(e)}")
return {"bytes": 0, "readable": "0 B"}
def _formatBytes(self, bytes: int) -> str:
"""Format bytes to human-readable string"""
for unit in ['B', 'KB', 'MB', 'GB']:
if bytes < 1024.0:
return f"{bytes:.1f} {unit}"
bytes /= 1024.0
return f"{bytes:.1f} TB"
def _isFormatCompatible(self, deliveredFormat: str, expectedFormat: str) -> bool:
"""
Generic format compatibility check.
- txt/md/html are text formats (compatible with each other)
- pdf/docx/xlsx are document formats (not compatible with each other)
- json/xml are structured formats
- images are image formats
"""
deliveredLower = deliveredFormat.lower()
expectedLower = expectedFormat.lower()
# Exact match
if deliveredLower == expectedLower:
return True
# Text formats are interchangeable
textFormats = ['txt', 'md', 'html', 'text', 'plain']
if deliveredLower in textFormats and expectedLower in textFormats:
return True
# Structured formats
if deliveredLower in ['json', 'xml'] and expectedLower in ['json', 'xml']:
return True
# Document formats are NOT compatible with each other
documentFormats = ['pdf', 'docx', 'xlsx', 'pptx']
if deliveredLower in documentFormats and expectedLower in documentFormats:
return False # pdf ≠ docx
return False
async def _validateWithAI(self, documents: List[Any], intent: Dict[str, Any], taskStep: Optional[Any] = None) -> Dict[str, Any]:
"""AI-based comprehensive validation - generic approach"""
try:
if not hasattr(self, 'services') or not self.services or not hasattr(self.services, 'ai'):
return self._createFailedValidationResult("AI service not available")
# Use taskStep.objective if available, otherwise fall back to intent.primaryGoal
taskObjective = None
if taskStep and hasattr(taskStep, 'objective'):
taskObjective = taskStep.objective
elif taskStep and isinstance(taskStep, dict):
taskObjective = taskStep.get('objective')
# Use taskStep format fields if available, otherwise fall back to intent
dataType = None
expectedFormat = None
if taskStep:
if hasattr(taskStep, 'dataType') and taskStep.dataType:
dataType = taskStep.dataType
elif isinstance(taskStep, dict):
dataType = taskStep.get('dataType')
if hasattr(taskStep, 'expectedFormat') and taskStep.expectedFormat:
expectedFormat = taskStep.expectedFormat
elif isinstance(taskStep, dict):
expectedFormat = taskStep.get('expectedFormat')
# Fallback to intent if taskStep format fields not available
if not dataType:
dataType = intent.get('dataType', 'unknown')
if not expectedFormat:
expectedFormat = intent.get('expectedFormat', 'unknown')
# Determine objective text and label
objectiveText = taskObjective if taskObjective else intent.get('primaryGoal', 'Unknown')
objectiveLabel = "TASK OBJECTIVE" if taskObjective else "USER REQUEST"
# Build prompt base WITHOUT document summaries first
# Use success criteria from taskStep if available, otherwise from intent
successCriteria = []
if taskStep and hasattr(taskStep, 'successCriteria') and taskStep.successCriteria:
successCriteria = taskStep.successCriteria
elif taskStep and isinstance(taskStep, dict):
successCriteria = taskStep.get('successCriteria', [])
else:
successCriteria = intent.get('successCriteria', [])
criteriaCount = len(successCriteria)
promptBase = f"""TASK VALIDATION
{objectiveLabel}: '{objectiveText}'
EXPECTED DATA TYPE: {dataType}
EXPECTED FORMAT: {expectedFormat}
SUCCESS CRITERIA ({criteriaCount} items): {successCriteria}
VALIDATION RULES:
IMPORTANT: You only have document METADATA (filename, format, size, mimeType) - NOT document content.
Validate based on metadata only:
1. Check if filenames are meaningful and match approximately the task objective
2. Check if delivered formats are compatible with expected format
3. Check if document sizes are reasonable for the task objective
4. Assess if filename and size combination suggests correct data type
5. Rate overall quality (0.0-1.0) based on metadata indicators
6. Identify specific gaps based on what the user requested (infer from filename, size, format - NOT content)
OUTPUT FORMAT - JSON ONLY (no prose):
{{
"overallSuccess": false,
"qualityScore": 0.0,
"dataTypeMatch": false,
"formatMatch": false,
"documentCount": {len(documents)},
"successCriteriaMet": {[False] * criteriaCount},
"gapAnalysis": "Describe what is missing or incorrect based on filename, size, format metadata",
"improvementSuggestions": ["General action to improve overall result"],
"validationDetails": [
{{
"documentName": "document.ext",
"issues": ["Issue inferred from metadata (e.g., filename doesn't match task, size too small for objective)"],
"suggestions": ["Specific fix based on metadata analysis"]
}}
]
}}
Field explanations:
- "improvementSuggestions": Overall actions to improve the entire result (general, high-level)
- "validationDetails[].suggestions": Specific fixes for each document's individual issues (document-specific, detailed)
- Do NOT use prefixes like "NEXT STEP:" - describe actions directly
DELIVERED DOCUMENTS ({len(documents)} items):
"""
# Calculate available space for document summaries
# Get the model that will be used for validation
basePromptSize = len(promptBase.encode('utf-8'))
availableBytes = self._calculateAvailablePromptSpace(basePromptSize)
# Analyze documents with size constraints
documentSummaries = self._analyzeDocumentsWithSizeLimit(documents, availableBytes)
# Build final prompt with summaries at the end
documentsJson = json.dumps(documentSummaries, indent=2)
validationPrompt = promptBase + documentsJson
# Call AI service for validation
response = await self.services.ai.callAiPlanning(
prompt=validationPrompt,
placeholders=None
)
if not response or not response.strip():
logger.warning("AI validation returned empty response")
raise ValueError("AI validation failed - empty response")
# Clean and extract JSON from response
result = response.strip()
logger.debug(f"AI validation response length: {len(result)}")
# Try to find JSON in the response with multiple strategies
# Strategy 1: Look for JSON in markdown code blocks
json_match = re.search(r'```(?:json)?\s*(\{.*?\})\s*```', result, re.DOTALL)
if json_match:
result = json_match.group(1)
logger.debug(f"Extracted JSON from markdown code block: {result[:200]}...")
else:
# Strategy 2: Look for JSON object with proper structure
json_match = re.search(r'\{[^{}]*"overallSuccess"[^{}]*\}', result, re.DOTALL)
if not json_match:
# Strategy 3: Look for any JSON object
json_match = re.search(r'\{.*\}', result, re.DOTALL)
if json_match:
result = json_match.group(0)
logger.debug(f"Extracted JSON directly: {result[:200]}...")
else:
logger.debug(f"No JSON found in AI response: {result[:200]}...")
logger.debug(f"Full AI response: {result}")
raise ValueError("AI validation failed - no JSON in response")
try:
aiResult = json.loads(result)
logger.info("AI validation JSON parsed successfully")
overall = aiResult.get("overallSuccess")
quality = aiResult.get("qualityScore")
details = aiResult.get("validationDetails")
gap = aiResult.get("gapAnalysis", "")
criteria = aiResult.get("successCriteriaMet")
improvements = aiResult.get("improvementSuggestions", [])
# Normalize while keeping failures explicit
normalized = {
"overallSuccess": overall if isinstance(overall, bool) else None,
"qualityScore": float(quality) if isinstance(quality, (int, float)) else None,
"documentCount": len(documentSummaries),
"validationDetails": details if isinstance(details, list) else [{
"documentName": "AI Validation",
"gapAnalysis": gap,
"successCriteriaMet": criteria if isinstance(criteria, list) else []
}],
"improvementSuggestions": improvements,
"schemaCompliant": True,
"originalType": "json",
"missingFields": []
}
if normalized["overallSuccess"] is None:
normalized["missingFields"].append("overallSuccess")
if normalized["qualityScore"] is None:
normalized["missingFields"].append("qualityScore")
if normalized["missingFields"]:
normalized["schemaCompliant"] = False
return normalized
except json.JSONDecodeError as json_error:
logger.warning(f"AI validation invalid JSON: {str(json_error)}")
logger.debug(f"JSON content: {result}")
raise
raise ValueError("AI validation failed - no response")
except Exception as e:
logger.error(f"AI validation failed: {str(e)}")
raise
def _createFailedValidationResult(self, errorMessage: str) -> Dict[str, Any]:
"""Create a standardized failed validation result"""
return {
"overallSuccess": False,
"qualityScore": 0.0,
"dataTypeMatch": False,
"formatMatch": False,
"documentCount": 0,
"successCriteriaMet": [],
"gapAnalysis": errorMessage,
"improvementSuggestions": [],
"validationDetails": [],
"schemaCompliant": True,
"originalType": "error",
"missingFields": [],
"error": errorMessage
}