# contentValidator.py # Content validation for adaptive Dynamic mode # Generic, document-aware validation system import logging import json import base64 import re from typing import List, Dict, Any, Optional logger = logging.getLogger(__name__) # Configuration constants MAX_CONTENT_SIZE_FOR_FULL_PREVIEW = 50 * 1024 # 50KB threshold PREVIEW_SAMPLE_SIZE = 1024 # 1KB preview for large documents class ContentValidator: """Validates delivered content against user intent - generic and document-aware""" def __init__(self, services=None, learningEngine=None): self.services = services self.learningEngine = learningEngine async def validateContent(self, documents: List[Any], intent: Dict[str, Any], taskStep: Optional[Any] = None, actionName: Optional[str] = None, actionParameters: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: """Validates delivered content against user intent using AI (single attempt; parse-or-fail) Args: documents: List of documents to validate intent: Workflow-level intent dict (for format requirements) taskStep: Optional TaskStep object (preferred source for objective) actionName: Optional action name (e.g., "ai.process", "ai.webResearch") that created the documents actionParameters: Optional action parameters used during execution (e.g., {"columnsPerRow": 10, "researchDepth": "deep"}) """ return await self._validateWithAI(documents, intent, taskStep, actionName, actionParameters) def _analyzeDocuments(self, documents: List[Any]) -> List[Dict[str, Any]]: """Generic document analysis - create simple summaries with metadata.""" summaries = [] for doc in documents: try: data = getattr(doc, 'documentData', None) name = getattr(doc, 'documentName', 'Unknown') mimeType = getattr(doc, 'mimeType', 'unknown') formatExt = self._detectFormat(doc) sizeInfo = self._calculateSize(doc) # Simple preview: if it's dict/list, dump JSON; otherwise use string preview = None if data is not None: if isinstance(data, (dict, list)): preview = json.dumps(data, indent=2, ensure_ascii=False) # Truncate if too large if len(preview) > MAX_CONTENT_SIZE_FOR_FULL_PREVIEW: preview = preview[:PREVIEW_SAMPLE_SIZE] + f"\n\n[Truncated - {self._formatBytes(sizeInfo['bytes'])} total]" else: text = str(data) if len(text) > MAX_CONTENT_SIZE_FOR_FULL_PREVIEW: preview = text[:PREVIEW_SAMPLE_SIZE] + f"\n\n[Truncated - {self._formatBytes(sizeInfo['bytes'])} total]" else: preview = text summary = { "name": name, "mimeType": mimeType, "format": formatExt, "size": sizeInfo["readable"], "preview": preview } summaries.append(summary) except Exception as e: logger.warning(f"Error analyzing document {getattr(doc, 'documentName', 'Unknown')}: {str(e)}") summaries.append({ "name": getattr(doc, 'documentName', 'Unknown'), "mimeType": getattr(doc, 'mimeType', 'unknown'), "format": "unknown", "size": "0 B", "preview": None, "error": str(e) }) return summaries def _calculateAvailablePromptSpace(self, basePromptSizeBytes: int) -> int: """Calculate available space for document summaries based on model context length.""" try: from modules.aicore.aicoreModelRegistry import modelRegistry from modules.aicore.aicoreModelSelector import modelSelector from modules.datamodels.datamodelAi import AiCallOptions, OperationTypeEnum # Get available models availableModels = modelRegistry.getAvailableModels() # Create options for PLAN operation (what validation uses) # Use default values for priority and processingMode (will use defaults from AiCallOptions) from modules.datamodels.datamodelAi import PriorityEnum, ProcessingModeEnum options = AiCallOptions( operationType=OperationTypeEnum.PLAN, priority=PriorityEnum.BALANCED, processingMode=ProcessingModeEnum.BASIC ) # Get failover model list to find the model that will be used failoverModels = modelSelector.getFailoverModelList("", "", options, availableModels) if not failoverModels: # Fallback: assume 16K tokens context (conservative) logger.warning("No models available for space calculation, using fallback: 16K tokens") maxBytes = 16 * 1024 * 4 # 16K tokens * 4 bytes per token else: # Use the first (best) model model = failoverModels[0] # Calculate 80% of context length in bytes (tokens * 4 bytes per token) maxBytes = int(model.contextLength * 0.8 * 4) # Available space = max - base prompt - safety margin (10%) availableBytes = int((maxBytes - basePromptSizeBytes) * 0.9) # Ensure minimum available space (at least 1KB) availableBytes = max(availableBytes, 1024) logger.debug(f"Prompt space calculation: base={basePromptSizeBytes} bytes, max={maxBytes} bytes, available={availableBytes} bytes") return availableBytes except Exception as e: logger.warning(f"Error calculating available prompt space: {str(e)}, using fallback: 8KB") # Fallback: assume 8KB available return 8 * 1024 def _summarizeJsonStructure(self, jsonData: Any) -> Dict[str, Any]: """Summarize JSON document structure for validation - extracts main objects, statistics, captions, and IDs.""" try: if not isinstance(jsonData, dict): return {"type": "non-dict", "preview": str(jsonData)[:200]} summary = { "metadata": {}, "sections": [], "statistics": {} } # Extract metadata metadata = jsonData.get("metadata", {}) if metadata: summary["metadata"] = { "title": metadata.get("title"), "split_strategy": metadata.get("split_strategy"), "extraction_method": metadata.get("extraction_method") } # Extract documents array (if present) documents = jsonData.get("documents", []) if documents: summary["statistics"]["documentCount"] = len(documents) # Process first document (most common case) if len(documents) > 0: doc = documents[0] docSections = doc.get("sections", []) summary["statistics"]["sectionCount"] = len(docSections) # Summarize sections for section in docSections: sectionSummary = { "id": section.get("id"), "content_type": section.get("content_type"), "title": section.get("title"), "order": section.get("order") } # For tables: extract caption and statistics if section.get("content_type") == "table": elements = section.get("elements", []) if elements and isinstance(elements, list) and len(elements) > 0: tableElement = elements[0] sectionSummary["caption"] = tableElement.get("caption") headers = tableElement.get("headers", []) rows = tableElement.get("rows", []) sectionSummary["columnCount"] = len(headers) sectionSummary["rowCount"] = len(rows) sectionSummary["headers"] = headers # Include headers for context # For lists: extract item count elif section.get("content_type") == "list": elements = section.get("elements", []) if elements and isinstance(elements, list) and len(elements) > 0: listElement = elements[0] items = listElement.get("items", []) sectionSummary["itemCount"] = len(items) # For paragraphs/headings: extract text preview elif section.get("content_type") in ["paragraph", "heading"]: elements = section.get("elements", []) if elements and isinstance(elements, list) and len(elements) > 0: textElement = elements[0] text = textElement.get("text", "") if text: sectionSummary["textPreview"] = text[:100] + ("..." if len(text) > 100 else "") summary["sections"].append(sectionSummary) else: # Fallback: check for sections directly in root sections = jsonData.get("sections", []) if sections: summary["statistics"]["sectionCount"] = len(sections) for section in sections: sectionSummary = { "id": section.get("id"), "content_type": section.get("content_type"), "title": section.get("title") } if section.get("content_type") == "table": elements = section.get("elements", []) if elements and isinstance(elements, list) and len(elements) > 0: tableElement = elements[0] sectionSummary["caption"] = tableElement.get("caption") headers = tableElement.get("headers", []) rows = tableElement.get("rows", []) sectionSummary["columnCount"] = len(headers) sectionSummary["rowCount"] = len(rows) sectionSummary["headers"] = headers summary["sections"].append(sectionSummary) return summary except Exception as e: logger.warning(f"Error summarizing JSON structure: {str(e)}") return {"error": str(e), "type": "error"} def _analyzeDocumentsWithSizeLimit(self, documents: List[Any], maxTotalBytes: int) -> List[Dict[str, Any]]: """ Analyze documents for validation - includes metadata AND JSON structure summary. JSON summary provides structure information (sections, tables with captions, IDs) without full content. """ if not documents: return [] summaries = [] for doc in documents: try: name = getattr(doc, 'documentName', 'Unknown') mimeType = getattr(doc, 'mimeType', 'unknown') formatExt = self._detectFormat(doc) sizeInfo = self._calculateSize(doc) summary = { "name": name, "mimeType": mimeType, "format": formatExt, "size": sizeInfo["readable"] } # Extract JSON structure summary - prioritize sourceJson for rendered documents sourceJson = getattr(doc, 'sourceJson', None) data = getattr(doc, 'documentData', None) if sourceJson and isinstance(sourceJson, dict): # Use source JSON for structure analysis (for rendered documents like xlsx/docx/pdf) jsonSummary = self._summarizeJsonStructure(sourceJson) summary["jsonStructure"] = jsonSummary elif data is not None: # Fallback: try to parse documentData as JSON (for non-rendered documents) if isinstance(data, dict): # Summarize JSON structure jsonSummary = self._summarizeJsonStructure(data) summary["jsonStructure"] = jsonSummary elif isinstance(data, list) and len(data) > 0 and isinstance(data[0], dict): # Handle list of documents jsonSummary = self._summarizeJsonStructure(data[0]) summary["jsonStructure"] = jsonSummary summaries.append(summary) except Exception as e: logger.warning(f"Error analyzing document {getattr(doc, 'documentName', 'Unknown')}: {str(e)}") summaries.append({ "name": getattr(doc, 'documentName', 'Unknown'), "mimeType": getattr(doc, 'mimeType', 'unknown'), "format": "unknown", "size": "0 B", "error": str(e) }) return summaries def _detectFormat(self, doc: Any) -> str: """Extract format from filename extension (always use extension)""" try: docName = getattr(doc, 'documentName', '') # Extract from filename extension if docName and '.' in docName: ext = docName.rsplit('.', 1)[1].lower() return ext return 'unknown' except Exception as e: logger.warning(f"Error detecting format: {str(e)}") return 'unknown' def _calculateSize(self, doc: Any) -> Dict[str, Any]: """Calculate document size in bytes and human-readable format""" try: if not hasattr(doc, 'documentData') or doc.documentData is None: return {"bytes": 0, "readable": "0 B"} data = doc.documentData size_bytes = 0 if isinstance(data, str): size_bytes = len(data.encode('utf-8')) elif isinstance(data, bytes): size_bytes = len(data) elif isinstance(data, (dict, list)): # Estimate JSON size try: json_str = json.dumps(data) size_bytes = len(json_str.encode('utf-8')) except: size_bytes = len(str(data).encode('utf-8')) else: size_bytes = len(str(data).encode('utf-8')) # Convert to human-readable format readable = self._formatBytes(size_bytes) return {"bytes": size_bytes, "readable": readable} except Exception as e: logger.warning(f"Error calculating size: {str(e)}") return {"bytes": 0, "readable": "0 B"} def _formatBytes(self, bytes: int) -> str: """Format bytes to human-readable string""" for unit in ['B', 'KB', 'MB', 'GB']: if bytes < 1024.0: return f"{bytes:.1f} {unit}" bytes /= 1024.0 return f"{bytes:.1f} TB" def _isFormatCompatible(self, deliveredFormat: str, expectedFormat: str) -> bool: """ Generic format compatibility check. - txt/md/html are text formats (compatible with each other) - pdf/docx/xlsx are document formats (not compatible with each other) - json/xml are structured formats - images are image formats """ deliveredLower = deliveredFormat.lower() expectedLower = expectedFormat.lower() # Exact match if deliveredLower == expectedLower: return True # Text formats are interchangeable textFormats = ['txt', 'md', 'html', 'text', 'plain'] if deliveredLower in textFormats and expectedLower in textFormats: return True # Structured formats if deliveredLower in ['json', 'xml'] and expectedLower in ['json', 'xml']: return True # Document formats are NOT compatible with each other documentFormats = ['pdf', 'docx', 'xlsx', 'pptx'] if deliveredLower in documentFormats and expectedLower in documentFormats: return False # pdf ≠ docx return False async def _validateWithAI(self, documents: List[Any], intent: Dict[str, Any], taskStep: Optional[Any] = None, actionName: Optional[str] = None, actionParameters: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: """AI-based comprehensive validation - generic approach""" try: if not hasattr(self, 'services') or not self.services or not hasattr(self.services, 'ai'): return self._createFailedValidationResult("AI service not available") # Use taskStep.objective if available, otherwise fall back to intent.primaryGoal taskObjective = None if taskStep and hasattr(taskStep, 'objective'): taskObjective = taskStep.objective elif taskStep and isinstance(taskStep, dict): taskObjective = taskStep.get('objective') # Use taskStep format fields if available, otherwise fall back to intent dataType = None expectedFormats = None if taskStep: if hasattr(taskStep, 'dataType') and taskStep.dataType: dataType = taskStep.dataType elif isinstance(taskStep, dict): dataType = taskStep.get('dataType') if hasattr(taskStep, 'expectedFormats') and taskStep.expectedFormats: expectedFormats = taskStep.expectedFormats elif isinstance(taskStep, dict): expectedFormats = taskStep.get('expectedFormats') # Fallback to intent if taskStep format fields not available if not dataType: dataType = intent.get('dataType', 'unknown') if not expectedFormats: expectedFormats = intent.get('expectedFormats', []) # Determine objective text and label objectiveText = taskObjective if taskObjective else intent.get('primaryGoal', 'Unknown') objectiveLabel = "TASK OBJECTIVE" if taskObjective else "USER REQUEST" # Build prompt base WITHOUT document summaries first # Use success criteria from taskStep if available, otherwise from intent successCriteria = [] if taskStep and hasattr(taskStep, 'successCriteria') and taskStep.successCriteria: successCriteria = taskStep.successCriteria elif taskStep and isinstance(taskStep, dict): successCriteria = taskStep.get('successCriteria', []) else: successCriteria = intent.get('successCriteria', []) criteriaCount = len(successCriteria) # Build action name context with human-readable description actionContext = "" if actionName: # Convert action name to human-readable format actionDescription = actionName.replace("ai.", "").replace(".", " ").title() actionContext = f"\nDOCUMENTS CREATED BY: {actionDescription} ({actionName})" # Build action parameters context actionParamsContext = "" if actionParameters and isinstance(actionParameters, dict) and len(actionParameters) > 0: # Filter out documentList and other large/redundant parameters for clarity relevantParams = {k: v for k, v in actionParameters.items() if k not in ['documentList', 'connections'] and v is not None} if relevantParams: paramsJson = json.dumps(relevantParams, ensure_ascii=False, indent=2) actionParamsContext = f"\nACTION PARAMETERS USED: {paramsJson}" # Extract validation metadata from documents (action-specific context) validationMetadataContext = "" if documents: metadataList = [] for doc in documents: metadata = getattr(doc, 'validationMetadata', None) if metadata and isinstance(metadata, dict): metadataList.append(metadata) if metadataList: # Combine all metadata (usually just one document) combinedMetadata = {} for meta in metadataList: combinedMetadata.update(meta) if combinedMetadata: metadataJson = json.dumps(combinedMetadata, ensure_ascii=False, indent=2) validationMetadataContext = f"\nACTION VALIDATION METADATA: {metadataJson}" # Format success criteria for display with index numbers if successCriteria: criteriaDisplay = "\n".join([f"[{i}] {criterion}" for i, criterion in enumerate(successCriteria)]) else: criteriaDisplay = "[]" promptBase = f"""TASK VALIDATION === TASK INFORMATION === {objectiveLabel}: '{objectiveText}' EXPECTED DATA TYPE: {dataType} EXPECTED FORMATS: {expectedFormats if expectedFormats else ['any']}{actionContext}{actionParamsContext}{validationMetadataContext} === VALIDATION INSTRUCTIONS === IMPORTANT: Different formats can represent the same data structure. Do not reject a format just because it differs from expected - check the structure summary for actual content. VALIDATION RULES: 1. Use structure summary (sections, statistics, counts) as PRIMARY evidence. Trust structure over format claims. 2. For each criterion in criteriaMapping: evaluate ONLY that criterion. Do not mention other criteria. 3. Priority: Data completeness > Format compatibility. Missing data is more critical than format mismatch. 4. Format understanding: Different formats can represent equivalent data structures. Focus on content, not format name. 5. Data availability assessment: If delivered documents do not contain required data, clearly indicate this in findings. Re-reading the same documents might not help. VALIDATION STEPS: - Check ACTION VALIDATION METADATA first (if present) - this contains action-specific context - Check structure summary for quantities, counts, statistics - Compare found values with required values from criteria - If structure unavailable, use metadata only (format, filename, size) - Classify gaps: missing_data (less than required), incomplete_data (partial), wrong_structure (wrong organization), wrong_format (format mismatch but data present) - Assess if documents contain the required data: If structure shows documents lack the data, note this in findings - data must be generated or obtained elsewhere, not re-extracted from same documents SCORING: - Data complete + structure matches → qualityScore: 0.9-1.0 - Data complete but format issues → qualityScore: 0.7-0.9 - Missing/incomplete data → qualityScore: <0.7 - Format mismatch only (data present) → qualityScore: 0.6-0.7 SUGGESTIONS: - ONE suggestion per UNMET criterion, ordered by criteriaMapping index - Reference actual structure values found and required values - Calculate quantitative gaps when numbers are available - Be specific and actionable based on structure evidence === OUTPUT FORMAT === {{ "overallSuccess": false, "qualityScore": 0.0, "dataTypeMatch": false, "formatMatch": false, "documentCount": {len(documents)}, "criteriaMapping": [ {{ "index": 0, "criterion": "exact_criterion_text", "met": false, "reason": "explanation_for_this_criterion_only" }} ], "gapAnalysis": "Brief gap summary", "gapType": "missing_data" | "wrong_structure" | "wrong_format" | "incomplete_data" | "no_gap", "structureComparison": {{ "required": {{}}, "found": {{}}, "gap": {{}} }}, "improvementSuggestions": ["One suggestion per unmet criterion"], "validationDetails": [ {{ "documentName": "name.ext", "issues": ["Specific issue"], "suggestions": ["Specific fix"] }} ] }} === DATA === SUCCESS CRITERIA TO VALIDATE in criteriaMapping array: {criteriaDisplay} DELIVERED DOCUMENTS ({len(documents)} items): """ # Calculate available space for document summaries # Get the model that will be used for validation basePromptSize = len(promptBase.encode('utf-8')) availableBytes = self._calculateAvailablePromptSpace(basePromptSize) # Analyze documents with size constraints documentSummaries = self._analyzeDocumentsWithSizeLimit(documents, availableBytes) # Build final prompt with summaries at the end documentsJson = json.dumps(documentSummaries, indent=2, ensure_ascii=False) validationPrompt = promptBase + documentsJson # Call AI service for validation response = await self.services.ai.callAiPlanning( prompt=validationPrompt, placeholders=None, debugType="contentvalidation" ) if not response or not response.strip(): logger.warning("AI validation returned empty response") raise ValueError("AI validation failed - empty response") # Clean and extract JSON from response using proper JSON extraction utility # This handles nested structures and markdown code blocks correctly result = response.strip() logger.debug(f"AI validation response length: {len(result)}") # Extract JSON first extractedJson = self.services.utils.jsonExtractString(result) if not extractedJson: logger.debug(f"No JSON found in AI response: {result[:200]}...") logger.debug(f"Full AI response: {result}") raise ValueError("AI validation failed - no JSON in response") # Proactively fix Python-style booleans (False/True -> false/true) BEFORE parsing # This handles booleans in any context: standalone, in lists, in dicts, etc. # Use word boundaries but also handle cases where booleans are in brackets/arrays # Replace False/True regardless of context (word boundary handles string matching correctly) normalizedJson = re.sub(r'\bFalse\b', 'false', extractedJson) normalizedJson = re.sub(r'\bTrue\b', 'true', normalizedJson) logger.debug(f"Extracted JSON (before normalization): {extractedJson[:200]}...") logger.debug(f"Normalized JSON (after boolean fix): {normalizedJson[:200]}...") # Now try to parse the normalized JSON try: aiResult = json.loads(normalizedJson) logger.info("AI validation JSON parsed successfully") except json.JSONDecodeError as json_error: logger.warning(f"AI validation invalid JSON after normalization: {str(json_error)}") logger.debug(f"JSON content that failed: {normalizedJson[:500]}...") raise ValueError(f"AI validation failed - invalid JSON: {str(json_error)}") overall = aiResult.get("overallSuccess") quality = aiResult.get("qualityScore") details = aiResult.get("validationDetails") gap = aiResult.get("gapAnalysis", "") improvements = aiResult.get("improvementSuggestions", []) gap_type = aiResult.get("gapType", "") structure_comp = aiResult.get("structureComparison", {}) criteria_mapping = aiResult.get("criteriaMapping", []) # Normalize while keeping failures explicit normalized = { "overallSuccess": overall if isinstance(overall, bool) else None, "qualityScore": float(quality) if isinstance(quality, (int, float)) else None, "documentCount": len(documentSummaries), "gapAnalysis": gap if gap else "", "gapType": gap_type if gap_type else "", "structureComparison": structure_comp if structure_comp else {}, "criteriaMapping": criteria_mapping if isinstance(criteria_mapping, list) else [], "validationDetails": details if isinstance(details, list) else [{ "documentName": "AI Validation", "gapAnalysis": gap }], "improvementSuggestions": improvements, "schemaCompliant": True, "originalType": "json", "missingFields": [] } if normalized["overallSuccess"] is None: normalized["missingFields"].append("overallSuccess") if normalized["qualityScore"] is None: normalized["missingFields"].append("qualityScore") if normalized["missingFields"]: normalized["schemaCompliant"] = False return normalized except Exception as e: logger.error(f"AI validation failed: {str(e)}") raise def _createFailedValidationResult(self, errorMessage: str) -> Dict[str, Any]: """Create a standardized failed validation result""" return { "overallSuccess": False, "qualityScore": 0.0, "dataTypeMatch": False, "formatMatch": False, "documentCount": 0, "criteriaMapping": [], "gapAnalysis": errorMessage, "improvementSuggestions": [], "validationDetails": [], "schemaCompliant": True, "originalType": "error", "missingFields": [], "error": errorMessage }