# Copyright (c) 2025 Patrick Motsch # All rights reserved. """ Test cases for JSON merger with different use cases and random cuts. Tests the robustness of the JSON merger by: 1. Creating test JSON for different use cases 2. Cutting it randomly at various points 3. Running the merger for each piece 4. Checking completeness against original """ import json import random import logging import sys import os from typing import Dict, Any, List, Tuple # Add project root to Python path # Find project root by looking for gateway/modules structure currentFile = os.path.abspath(__file__) currentDir = os.path.dirname(currentFile) # Navigate up from: gateway/modules/services/serviceAi/test_json_merger.py # To project root: D:\Athi\Local\Web\poweron # Try different levels up candidates = [ os.path.abspath(os.path.join(currentDir, '../../../../')), # From gateway/modules/services/serviceAi os.path.abspath(os.path.join(currentDir, '../../..')), # Alternative os.path.abspath(os.path.join(currentDir, '../..')), # Another alternative ] projectRoot = None for candidate in candidates: gatewayModulesPath = os.path.join(candidate, 'gateway', 'modules') if os.path.exists(gatewayModulesPath): projectRoot = candidate break # If still not found, try to find by looking for gateway directory if projectRoot is None: searchDir = currentDir for _ in range(10): # Max 10 levels up gatewayPath = os.path.join(searchDir, 'gateway') if os.path.exists(gatewayPath) and os.path.exists(os.path.join(gatewayPath, 'modules')): projectRoot = searchDir break parent = os.path.dirname(searchDir) if parent == searchDir: # Reached root break searchDir = parent if projectRoot is None: raise RuntimeError(f"Could not find project root. Current file: {currentFile}") # Add gateway directory to Python path (not project root) gatewayPath = os.path.join(projectRoot, 'gateway') if gatewayPath not in sys.path: sys.path.insert(0, gatewayPath) # Verify the path is correct modulesPath = os.path.join(projectRoot, 'gateway', 'modules') if not os.path.exists(modulesPath): raise RuntimeError(f"Project root verification failed. Expected gateway/modules at: {modulesPath}") try: from modules.services.serviceAi.subJsonResponseHandling import JsonResponseHandler from modules.services.serviceAi.subJsonMerger import JsonMergeLogger from modules.shared.jsonUtils import ( normalizeJsonText, stripCodeFences, closeJsonStructures, tryParseJson, extractJsonStructureContext ) except ImportError as e: # Try to help debug print(f"Import error: {e}") print(f"Project root: {projectRoot}") print(f"Gateway path: {gatewayPath}") print(f"Python path (first 3): {sys.path[:3]}") print(f"Looking for modules at: {modulesPath}") print(f"Exists: {os.path.exists(modulesPath)}") if os.path.exists(modulesPath): print(f"Contents: {os.listdir(modulesPath)[:5]}") raise logger = logging.getLogger(__name__) def createTestJsonForUseCase(useCaseId: str, size: int = 100) -> Dict[str, Any]: """ Create test JSON for a specific use case. Args: useCaseId: Use case ID (section_content, chapter_structure, etc.) size: Size of test data (number of elements/rows/items) Returns: Test JSON dictionary """ if useCaseId == "section_content": # Create table with rows elements = [{ "type": "table", "content": { "headers": ["Year", "Value"], "rows": [[str(1947 + i), str(10000 + i * 100)] for i in range(size)] } }] return {"elements": elements} elif useCaseId == "chapter_structure": chapters = [{ "id": f"chapter_{i}", "title": f"Chapter {i}", "level": 1 } for i in range(size)] return {"documents": [{"chapters": chapters}]} elif useCaseId == "code_structure": files = [{ "id": f"file_{i}", "filename": f"file_{i}.py", "fileType": "python", "functions": [f"function_{i}_{j}" for j in range(5)] } for i in range(size)] return {"files": files} elif useCaseId == "code_content": files = [{ "id": f"file_{i}", "content": f"# File {i}\ndef function_{i}():\n pass\n" * 10, "functions": [{"name": f"function_{i}_{j}", "line": j * 3} for j in range(5)] } for i in range(size)] return {"files": files} else: raise ValueError(f"Unknown use case: {useCaseId}") def cutJsonRandomly(jsonString: str, numCuts: int = 5, overlapSize: int = 100) -> List[str]: """ Cut JSON string RANDOMLY at different points WITH OVERLAP between fragments. Each fragment overlaps with the previous one to help merging. Args: jsonString: JSON string to cut numCuts: Number of cuts to make overlapSize: Size of overlap between fragments (in characters) Returns: List of JSON fragments with overlap """ fragments = [] currentPos = 0 totalLength = len(jsonString) if totalLength == 0: return [] # First fragment: from start to first cut point if numCuts > 0: # First cut point (between 20% and 40% of total) firstCutPoint = random.randint(int(totalLength * 0.2), int(totalLength * 0.4)) fragment = jsonString[:firstCutPoint] fragments.append(fragment) currentPos = firstCutPoint else: # No cuts - return whole string return [jsonString] # Subsequent fragments: each starts with overlap from previous, then continues for i in range(numCuts - 1): if currentPos >= totalLength: break # Calculate overlap start (go back overlapSize from current position) overlapStart = max(0, currentPos - overlapSize) # Calculate next cut point (between 20% and 40% of remaining) remaining = totalLength - currentPos if remaining < overlapSize * 2: # Not enough remaining - add rest as last fragment fragment = jsonString[overlapStart:] fragments.append(fragment) break # Next cut point from current position nextCutPoint = currentPos + random.randint(int(remaining * 0.2), int(remaining * 0.4)) nextCutPoint = min(nextCutPoint, totalLength) # Fragment: from overlap start to next cut point fragment = jsonString[overlapStart:nextCutPoint] fragments.append(fragment) currentPos = nextCutPoint # Add remaining as last fragment (with overlap) if currentPos < totalLength: overlapStart = max(0, currentPos - overlapSize) fragment = jsonString[overlapStart:] fragments.append(fragment) return fragments def testMergerWithFragments( originalJson: Dict[str, Any], fragments: List[str], useCaseId: str ) -> Tuple[bool, Dict[str, Any], str]: """ Test merger by merging fragments sequentially. Args: originalJson: Original complete JSON fragments: List of JSON fragments to merge useCaseId: Use case ID Returns: Tuple of (success, merged_json, error_message) """ if not fragments: return False, {}, "No fragments provided" # Log structure context for each fragment (especially incomplete ones) print(f"\n{'='*60}") print(f"FRAGMENT ANALYSIS (use case: {useCaseId})") print(f"{'='*60}") for fragIdx, fragment in enumerate(fragments): print(f"\nFragment {fragIdx + 1}/{len(fragments)}:") print(f" Length: {len(fragment)} chars") # Extract structure context for this fragment try: structureContext = extractJsonStructureContext(fragment, useCaseId) templateStructure = structureContext.get("template_structure", "") lastCompletePart = structureContext.get("last_complete_part", "") incompletePart = structureContext.get("incomplete_part", "") structureContextJson = structureContext.get("structure_context", "") # Check if fragment is incomplete normalized = stripCodeFences(normalizeJsonText(fragment)).strip() parsed, parseErr, _ = tryParseJson(normalized) isIncomplete = parseErr is not None or (parsed is None) if isIncomplete: print(f" Status: INCOMPLETE (cut off)") print(f" Template Structure:") if templateStructure: # Show first few lines of template templateLines = templateStructure.split('\n') templateLinesToShow = templateLines[:5] for line in templateLinesToShow: print(f" {line}") if len(templateLines) > 5: remainingLines = len(templateLines) - 5 print(f" ... ({remainingLines} more lines)") else: print(f" (not available)") print(f" Structure Context:") if structureContextJson: # Show structure context contextLines = structureContextJson.split('\n') contextLinesToShow = contextLines[:5] for line in contextLinesToShow: print(f" {line}") if len(contextLines) > 5: remainingContextLines = len(contextLines) - 5 print(f" ... ({remainingContextLines} more lines)") else: print(f" (not available)") print(f" Last Complete Part:") if lastCompletePart: # Show last complete part (truncated if too long) if len(lastCompletePart) > 200: print(f" {lastCompletePart[:200]}... ({len(lastCompletePart)} chars total)") else: print(f" {lastCompletePart}") else: print(f" (not available)") print(f" Incomplete Part:") if incompletePart: # Show incomplete part (truncated if too long) if len(incompletePart) > 200: print(f" {incompletePart[:200]}... ({len(incompletePart)} chars total)") else: print(f" {incompletePart}") else: print(f" (not available)") else: print(f" Status: COMPLETE") if structureContextJson: print(f" Structure Context:") contextLines = structureContextJson.split('\n') contextLinesToShow = contextLines[:3] for line in contextLinesToShow: print(f" {line}") if len(contextLines) > 3: remainingContextLines = len(contextLines) - 3 print(f" ... ({remainingContextLines} more lines)") except Exception as e: print(f" Error extracting structure context: {e}") print(f"\n{'='*60}\n") # Start with first fragment accumulated = fragments[0] # Merge each subsequent fragment for i, fragment in enumerate(fragments[1:], 1): try: accumulated, hasOverlap = JsonResponseHandler.mergeJsonStringsWithOverlap( accumulated, fragment ) # Log if no overlap was found (iterations would stop in real scenario) if not hasOverlap: print(f" ⚠️ Fragment {i}: No overlap found - iterations would stop here") # Check if result is empty (should never happen) if not accumulated or accumulated.strip() in ['{"elements": []}', '{}', '']: return False, {}, f"Merge {i} returned empty JSON" except Exception as e: return False, {}, f"Merge {i} failed with error: {str(e)}" # Parse merged result try: # Normalize and try to parse normalized = stripCodeFences(normalizeJsonText(accumulated)).strip() # Try to parse directly parsed, parseErr, _ = tryParseJson(normalized) if parseErr is not None: # Try closing structures if incomplete try: closed = closeJsonStructures(normalized) parsed, parseErr2, _ = tryParseJson(closed) if parseErr2 is not None: # Try to extract valid JSON prefix # JsonResponseHandler is already imported at module level validPrefix = JsonResponseHandler._extractValidJsonPrefix(normalized) if validPrefix: parsed, parseErr3, _ = tryParseJson(validPrefix) if parseErr3 is not None: return False, {}, f"Final parse error: {str(parseErr3)}" else: return False, {}, f"Final parse error: {str(parseErr2)}" except Exception as parseErr: return False, {}, f"Final parse error: {str(parseErr)}" if not parsed: return False, {}, "Final parse returned None" # CRITICAL: Ensure parsed is a dict, not a list # If it's a list, wrap it in the expected structure based on use case if isinstance(parsed, list): # Try to normalize list to expected structure if useCaseId == "section_content": # List of elements - wrap in elements structure parsed = {"elements": parsed} elif useCaseId == "chapter_structure": # List of chapters - wrap in documents structure parsed = {"documents": [{"chapters": parsed}]} elif useCaseId == "code_structure": # List of files - wrap in files structure parsed = {"files": parsed} elif useCaseId == "code_content": # List of files - wrap in files structure parsed = {"files": parsed} else: # Unknown use case - try to wrap as elements parsed = {"elements": parsed} # Ensure it's a dict now if not isinstance(parsed, dict): return False, {}, f"Final parse returned unexpected type: {type(parsed).__name__}" return True, parsed, "" except Exception as e: return False, {}, f"Final parse failed: {str(e)}" def compareJsonCompleteness( original: Dict[str, Any], merged: Dict[str, Any], useCaseId: str ) -> Tuple[bool, str]: """ Compare merged JSON with original to check completeness. Args: original: Original JSON merged: Merged JSON (must be a dict) useCaseId: Use case ID Returns: Tuple of (is_complete, message) """ # CRITICAL: Ensure merged is a dict if not isinstance(merged, dict): return False, f"Merged JSON is not a dict, got {type(merged).__name__}" if useCaseId == "section_content": origElements = original.get("elements", []) mergedElements = merged.get("elements", []) if not isinstance(origElements, list): return False, f"Original elements is not a list: {type(origElements).__name__}" if not isinstance(mergedElements, list): return False, f"Merged elements is not a list: {type(mergedElements).__name__}" if len(mergedElements) < len(origElements): return False, f"Missing elements: {len(origElements)} expected, {len(mergedElements)} found" # Check table rows if origElements and mergedElements: origTable = origElements[0] if isinstance(origElements[0], dict) else {} mergedTable = mergedElements[0] if isinstance(mergedElements[0], dict) else {} if not origTable or not mergedTable: return False, f"Table structure missing: origTable={bool(origTable)}, mergedTable={bool(mergedTable)}" origRows = origTable.get("content", {}).get("rows", []) if isinstance(origTable.get("content"), dict) else origTable.get("rows", []) mergedRows = mergedTable.get("content", {}).get("rows", []) if isinstance(mergedTable.get("content"), dict) else mergedTable.get("rows", []) if not isinstance(origRows, list): return False, f"Original rows is not a list: {type(origRows).__name__}" if not isinstance(mergedRows, list): return False, f"Merged rows is not a list: {type(mergedRows).__name__}" if len(mergedRows) < len(origRows): return False, f"Missing rows: {len(origRows)} expected, {len(mergedRows)} found" return True, "Complete" elif useCaseId == "chapter_structure": origChapters = original.get("documents", [{}])[0].get("chapters", []) mergedChapters = merged.get("documents", [{}])[0].get("chapters", []) if len(mergedChapters) < len(origChapters): return False, f"Missing chapters: {len(origChapters)} expected, {len(mergedChapters)} found" return True, "Complete" elif useCaseId == "code_structure": origFiles = original.get("files", []) mergedFiles = merged.get("files", []) if len(mergedFiles) < len(origFiles): return False, f"Missing files: {len(origFiles)} expected, {len(mergedFiles)} found" return True, "Complete" elif useCaseId == "code_content": origFiles = original.get("files", []) mergedFiles = merged.get("files", []) if len(mergedFiles) < len(origFiles): return False, f"Missing files: {len(origFiles)} expected, {len(mergedFiles)} found" return True, "Complete" else: return False, f"Unknown use case: {useCaseId}" def runTestForUseCase(useCaseId: str, size: int = 50, numTests: int = 10) -> Dict[str, Any]: """ Run multiple tests for a use case with random cuts. Args: useCaseId: Use case ID size: Size of test data numTests: Number of test runs Returns: Test results dictionary """ results = { "useCaseId": useCaseId, "size": size, "numTests": numTests, "passed": 0, "failed": 0, "errors": [] } for testNum in range(numTests): try: # Create test JSON originalJson = createTestJsonForUseCase(useCaseId, size) originalString = json.dumps(originalJson, indent=2, ensure_ascii=False) # Cut randomly fragments = cutJsonRandomly(originalString, numCuts=random.randint(3, 7)) # Test merger success, mergedJson, errorMsg = testMergerWithFragments( originalJson, fragments, useCaseId ) if not success: results["failed"] += 1 results["errors"].append(f"Test {testNum + 1}: {errorMsg}") continue # Check completeness isComplete, completenessMsg = compareJsonCompleteness( originalJson, mergedJson, useCaseId ) if isComplete: results["passed"] += 1 else: results["failed"] += 1 results["errors"].append(f"Test {testNum + 1}: {completenessMsg}") except Exception as e: results["failed"] += 1 results["errors"].append(f"Test {testNum + 1}: Exception - {str(e)}") return results def runAllTests(): """Run tests for all use cases.""" useCases = [ "section_content", "chapter_structure", "code_structure", "code_content" ] allResults = [] for useCaseId in useCases: print(f"\n{'='*60}") print(f"Testing use case: {useCaseId}") print(f"{'='*60}") # Initialize log file for this use case # Initialize log file (overwrite on each test run) logFileName = f"json_merger_{useCaseId}.txt" JsonMergeLogger.initializeLogFile(logFileName) print(f"Log file: {logFileName}") results = runTestForUseCase(useCaseId, size=50, numTests=10) allResults.append(results) print(f"Passed: {results['passed']}/{results['numTests']}") print(f"Failed: {results['failed']}/{results['numTests']}") if results["errors"]: print("\nErrors:") for error in results["errors"][:5]: # Show first 5 errors print(f" - {error}") # Summary print(f"\n{'='*60}") print("SUMMARY") print(f"{'='*60}") totalPassed = sum(r["passed"] for r in allResults) totalFailed = sum(r["failed"] for r in allResults) totalTests = sum(r["numTests"] for r in allResults) print(f"Total tests: {totalTests}") print(f"Passed: {totalPassed}") print(f"Failed: {totalFailed}") print(f"Success rate: {totalPassed / totalTests * 100:.1f}%") return allResults if __name__ == "__main__": # Set up logging - use WARNING level to reduce noise from jsonUtils logging.basicConfig(level=logging.WARNING) # Run tests results = runAllTests() # Save results to file (in project root) resultsFile = os.path.join(projectRoot, "test_json_merger_results.json") with open(resultsFile, "w", encoding="utf-8") as f: json.dump(results, f, indent=2, ensure_ascii=False) print(f"\nResults saved to {resultsFile}")