# contentValidator.py # Content validation for adaptive Dynamic mode # Generic, document-aware validation system import logging import json import base64 import re from typing import List, Dict, Any, Optional logger = logging.getLogger(__name__) # Configuration constants MAX_CONTENT_SIZE_FOR_FULL_PREVIEW = 50 * 1024 # 50KB threshold PREVIEW_SAMPLE_SIZE = 1024 # 1KB preview for large documents class ContentValidator: """Validates delivered content against user intent - generic and document-aware""" def __init__(self, services=None, learningEngine=None): self.services = services self.learningEngine = learningEngine async def validateContent(self, documents: List[Any], intent: Dict[str, Any], taskStep: Optional[Any] = None, actionName: Optional[str] = None) -> Dict[str, Any]: """Validates delivered content against user intent using AI (single attempt; parse-or-fail) Args: documents: List of documents to validate intent: Workflow-level intent dict (for format requirements) taskStep: Optional TaskStep object (preferred source for objective) actionName: Optional action name (e.g., "ai.process", "ai.webResearch") that created the documents """ return await self._validateWithAI(documents, intent, taskStep, actionName) def _analyzeDocuments(self, documents: List[Any]) -> List[Dict[str, Any]]: """Generic document analysis - create simple summaries with metadata.""" summaries = [] for doc in documents: try: data = getattr(doc, 'documentData', None) name = getattr(doc, 'documentName', 'Unknown') mimeType = getattr(doc, 'mimeType', 'unknown') formatExt = self._detectFormat(doc) sizeInfo = self._calculateSize(doc) # Simple preview: if it's dict/list, dump JSON; otherwise use string preview = None if data is not None: if isinstance(data, (dict, list)): preview = json.dumps(data, indent=2, ensure_ascii=False) # Truncate if too large if len(preview) > MAX_CONTENT_SIZE_FOR_FULL_PREVIEW: preview = preview[:PREVIEW_SAMPLE_SIZE] + f"\n\n[Truncated - {self._formatBytes(sizeInfo['bytes'])} total]" else: text = str(data) if len(text) > MAX_CONTENT_SIZE_FOR_FULL_PREVIEW: preview = text[:PREVIEW_SAMPLE_SIZE] + f"\n\n[Truncated - {self._formatBytes(sizeInfo['bytes'])} total]" else: preview = text summary = { "name": name, "mimeType": mimeType, "format": formatExt, "size": sizeInfo["readable"], "preview": preview } summaries.append(summary) except Exception as e: logger.warning(f"Error analyzing document {getattr(doc, 'documentName', 'Unknown')}: {str(e)}") summaries.append({ "name": getattr(doc, 'documentName', 'Unknown'), "mimeType": getattr(doc, 'mimeType', 'unknown'), "format": "unknown", "size": "0 B", "preview": None, "error": str(e) }) return summaries def _calculateAvailablePromptSpace(self, basePromptSizeBytes: int) -> int: """Calculate available space for document summaries based on model context length.""" try: from modules.aicore.aicoreModelRegistry import modelRegistry from modules.aicore.aicoreModelSelector import modelSelector from modules.datamodels.datamodelAi import AiCallOptions, OperationTypeEnum # Get available models availableModels = modelRegistry.getAvailableModels() # Create options for PLAN operation (what validation uses) # Use default values for priority and processingMode (will use defaults from AiCallOptions) from modules.datamodels.datamodelAi import PriorityEnum, ProcessingModeEnum options = AiCallOptions( operationType=OperationTypeEnum.PLAN, priority=PriorityEnum.BALANCED, processingMode=ProcessingModeEnum.BASIC ) # Get failover model list to find the model that will be used failoverModels = modelSelector.getFailoverModelList("", "", options, availableModels) if not failoverModels: # Fallback: assume 16K tokens context (conservative) logger.warning("No models available for space calculation, using fallback: 16K tokens") maxBytes = 16 * 1024 * 4 # 16K tokens * 4 bytes per token else: # Use the first (best) model model = failoverModels[0] # Calculate 80% of context length in bytes (tokens * 4 bytes per token) maxBytes = int(model.contextLength * 0.8 * 4) # Available space = max - base prompt - safety margin (10%) availableBytes = int((maxBytes - basePromptSizeBytes) * 0.9) # Ensure minimum available space (at least 1KB) availableBytes = max(availableBytes, 1024) logger.debug(f"Prompt space calculation: base={basePromptSizeBytes} bytes, max={maxBytes} bytes, available={availableBytes} bytes") return availableBytes except Exception as e: logger.warning(f"Error calculating available prompt space: {str(e)}, using fallback: 8KB") # Fallback: assume 8KB available return 8 * 1024 def _analyzeDocumentsWithSizeLimit(self, documents: List[Any], maxTotalBytes: int) -> List[Dict[str, Any]]: """ Analyze documents for validation - METADATA ONLY (no document content/previews). For planning/validation, we only need metadata to assess format, type, and size compatibility. """ if not documents: return [] summaries = [] for doc in documents: try: name = getattr(doc, 'documentName', 'Unknown') mimeType = getattr(doc, 'mimeType', 'unknown') formatExt = self._detectFormat(doc) sizeInfo = self._calculateSize(doc) # Only include metadata - NO document content/previews # This keeps prompts small and focused on validation criteria summary = { "name": name, "mimeType": mimeType, "format": formatExt, "size": sizeInfo["readable"] } summaries.append(summary) except Exception as e: logger.warning(f"Error analyzing document {getattr(doc, 'documentName', 'Unknown')}: {str(e)}") summaries.append({ "name": getattr(doc, 'documentName', 'Unknown'), "mimeType": getattr(doc, 'mimeType', 'unknown'), "format": "unknown", "size": "0 B", "error": str(e) }) return summaries def _detectFormat(self, doc: Any) -> str: """Extract format from filename extension (always use extension)""" try: docName = getattr(doc, 'documentName', '') # Extract from filename extension if docName and '.' in docName: ext = docName.rsplit('.', 1)[1].lower() return ext return 'unknown' except Exception as e: logger.warning(f"Error detecting format: {str(e)}") return 'unknown' def _calculateSize(self, doc: Any) -> Dict[str, Any]: """Calculate document size in bytes and human-readable format""" try: if not hasattr(doc, 'documentData') or doc.documentData is None: return {"bytes": 0, "readable": "0 B"} data = doc.documentData size_bytes = 0 if isinstance(data, str): size_bytes = len(data.encode('utf-8')) elif isinstance(data, bytes): size_bytes = len(data) elif isinstance(data, (dict, list)): # Estimate JSON size try: json_str = json.dumps(data) size_bytes = len(json_str.encode('utf-8')) except: size_bytes = len(str(data).encode('utf-8')) else: size_bytes = len(str(data).encode('utf-8')) # Convert to human-readable format readable = self._formatBytes(size_bytes) return {"bytes": size_bytes, "readable": readable} except Exception as e: logger.warning(f"Error calculating size: {str(e)}") return {"bytes": 0, "readable": "0 B"} def _formatBytes(self, bytes: int) -> str: """Format bytes to human-readable string""" for unit in ['B', 'KB', 'MB', 'GB']: if bytes < 1024.0: return f"{bytes:.1f} {unit}" bytes /= 1024.0 return f"{bytes:.1f} TB" def _isFormatCompatible(self, deliveredFormat: str, expectedFormat: str) -> bool: """ Generic format compatibility check. - txt/md/html are text formats (compatible with each other) - pdf/docx/xlsx are document formats (not compatible with each other) - json/xml are structured formats - images are image formats """ deliveredLower = deliveredFormat.lower() expectedLower = expectedFormat.lower() # Exact match if deliveredLower == expectedLower: return True # Text formats are interchangeable textFormats = ['txt', 'md', 'html', 'text', 'plain'] if deliveredLower in textFormats and expectedLower in textFormats: return True # Structured formats if deliveredLower in ['json', 'xml'] and expectedLower in ['json', 'xml']: return True # Document formats are NOT compatible with each other documentFormats = ['pdf', 'docx', 'xlsx', 'pptx'] if deliveredLower in documentFormats and expectedLower in documentFormats: return False # pdf ≠ docx return False async def _validateWithAI(self, documents: List[Any], intent: Dict[str, Any], taskStep: Optional[Any] = None, actionName: Optional[str] = None) -> Dict[str, Any]: """AI-based comprehensive validation - generic approach""" try: if not hasattr(self, 'services') or not self.services or not hasattr(self.services, 'ai'): return self._createFailedValidationResult("AI service not available") # Use taskStep.objective if available, otherwise fall back to intent.primaryGoal taskObjective = None if taskStep and hasattr(taskStep, 'objective'): taskObjective = taskStep.objective elif taskStep and isinstance(taskStep, dict): taskObjective = taskStep.get('objective') # Use taskStep format fields if available, otherwise fall back to intent dataType = None expectedFormats = None if taskStep: if hasattr(taskStep, 'dataType') and taskStep.dataType: dataType = taskStep.dataType elif isinstance(taskStep, dict): dataType = taskStep.get('dataType') if hasattr(taskStep, 'expectedFormats') and taskStep.expectedFormats: expectedFormats = taskStep.expectedFormats elif isinstance(taskStep, dict): expectedFormats = taskStep.get('expectedFormats') # Fallback to intent if taskStep format fields not available if not dataType: dataType = intent.get('dataType', 'unknown') if not expectedFormats: expectedFormats = intent.get('expectedFormats', []) # Determine objective text and label objectiveText = taskObjective if taskObjective else intent.get('primaryGoal', 'Unknown') objectiveLabel = "TASK OBJECTIVE" if taskObjective else "USER REQUEST" # Build prompt base WITHOUT document summaries first # Use success criteria from taskStep if available, otherwise from intent successCriteria = [] if taskStep and hasattr(taskStep, 'successCriteria') and taskStep.successCriteria: successCriteria = taskStep.successCriteria elif taskStep and isinstance(taskStep, dict): successCriteria = taskStep.get('successCriteria', []) else: successCriteria = intent.get('successCriteria', []) criteriaCount = len(successCriteria) # Build action name context actionContext = "" if actionName: actionContext = f"\nACTION THAT CREATED DOCUMENTS: {actionName}" promptBase = f"""TASK VALIDATION {objectiveLabel}: '{objectiveText}' EXPECTED DATA TYPE: {dataType} EXPECTED FORMATS: {expectedFormats if expectedFormats else ['any']} SUCCESS CRITERIA ({criteriaCount} items): {successCriteria}{actionContext} VALIDATION RULES: IMPORTANT: You only have document METADATA (filename, format, size, mimeType) - NOT document content. Validate based on metadata only: 1. Check if filenames are APPROXIMATELY meaningful (generic names like "generated.docx" are acceptable if format matches) 2. Check if delivered formats are compatible with expected format 3. Check if document sizes are reasonable for the task objective 4. Assess if filename and size combination suggests correct data type 5. Rate overall quality (0.0-1.0) based on metadata indicators, with format matching being the most important 6. Identify specific gaps based on what the user requested (infer from filename, size, format - NOT content) OUTPUT FORMAT - JSON ONLY (no prose): {{ "overallSuccess": false, "qualityScore": 0.0, "dataTypeMatch": false, "formatMatch": false, "documentCount": {len(documents)}, "successCriteriaMet": {"[false]" * criteriaCount}, "gapAnalysis": "Describe what is missing or incorrect based on filename, size, format metadata", "improvementSuggestions": ["General action to improve overall result"], "validationDetails": [ {{ "documentName": "document.ext", "issues": ["Issue inferred from metadata (e.g., filename doesn't match task, size too small for objective)"], "suggestions": ["Specific fix based on metadata analysis"] }} ] }} Field explanations: - "improvementSuggestions": Overall actions to improve the entire result (general, high-level) - "validationDetails[].suggestions": Specific fixes for each document's individual issues (document-specific, detailed) - Do NOT use prefixes like "NEXT STEP:" - describe actions directly DELIVERED DOCUMENTS ({len(documents)} items): """ # Calculate available space for document summaries # Get the model that will be used for validation basePromptSize = len(promptBase.encode('utf-8')) availableBytes = self._calculateAvailablePromptSpace(basePromptSize) # Analyze documents with size constraints documentSummaries = self._analyzeDocumentsWithSizeLimit(documents, availableBytes) # Build final prompt with summaries at the end documentsJson = json.dumps(documentSummaries, indent=2) validationPrompt = promptBase + documentsJson # Call AI service for validation response = await self.services.ai.callAiPlanning( prompt=validationPrompt, placeholders=None, debugType="contentvalidation" ) if not response or not response.strip(): logger.warning("AI validation returned empty response") raise ValueError("AI validation failed - empty response") # Clean and extract JSON from response using proper JSON extraction utility # This handles nested structures and markdown code blocks correctly result = response.strip() logger.debug(f"AI validation response length: {len(result)}") # Extract JSON first extractedJson = self.services.utils.jsonExtractString(result) if not extractedJson: logger.debug(f"No JSON found in AI response: {result[:200]}...") logger.debug(f"Full AI response: {result}") raise ValueError("AI validation failed - no JSON in response") # Proactively fix Python-style booleans (False/True -> false/true) BEFORE parsing # This handles booleans in any context: standalone, in lists, in dicts, etc. import re # Use word boundaries but also handle cases where booleans are in brackets/arrays # Replace False/True regardless of context (word boundary handles string matching correctly) normalizedJson = re.sub(r'\bFalse\b', 'false', extractedJson) normalizedJson = re.sub(r'\bTrue\b', 'true', normalizedJson) logger.debug(f"Extracted JSON (before normalization): {extractedJson[:200]}...") logger.debug(f"Normalized JSON (after boolean fix): {normalizedJson[:200]}...") # Now try to parse the normalized JSON try: aiResult = json.loads(normalizedJson) logger.info("AI validation JSON parsed successfully") except json.JSONDecodeError as json_error: logger.warning(f"AI validation invalid JSON after normalization: {str(json_error)}") logger.debug(f"JSON content that failed: {normalizedJson[:500]}...") raise ValueError(f"AI validation failed - invalid JSON: {str(json_error)}") overall = aiResult.get("overallSuccess") quality = aiResult.get("qualityScore") details = aiResult.get("validationDetails") gap = aiResult.get("gapAnalysis", "") criteria = aiResult.get("successCriteriaMet") improvements = aiResult.get("improvementSuggestions", []) # Normalize while keeping failures explicit normalized = { "overallSuccess": overall if isinstance(overall, bool) else None, "qualityScore": float(quality) if isinstance(quality, (int, float)) else None, "documentCount": len(documentSummaries), "validationDetails": details if isinstance(details, list) else [{ "documentName": "AI Validation", "gapAnalysis": gap, "successCriteriaMet": criteria if isinstance(criteria, list) else [] }], "improvementSuggestions": improvements, "schemaCompliant": True, "originalType": "json", "missingFields": [] } if normalized["overallSuccess"] is None: normalized["missingFields"].append("overallSuccess") if normalized["qualityScore"] is None: normalized["missingFields"].append("qualityScore") if normalized["missingFields"]: normalized["schemaCompliant"] = False return normalized except Exception as e: logger.error(f"AI validation failed: {str(e)}") raise def _createFailedValidationResult(self, errorMessage: str) -> Dict[str, Any]: """Create a standardized failed validation result""" return { "overallSuccess": False, "qualityScore": 0.0, "dataTypeMatch": False, "formatMatch": False, "documentCount": 0, "successCriteriaMet": [], "gapAnalysis": errorMessage, "improvementSuggestions": [], "validationDetails": [], "schemaCompliant": True, "originalType": "error", "missingFields": [], "error": errorMessage }