gateway/modules/services/serviceAi/test_json_merger.py
ValueOn AG 64590aa61e fixes
2026-01-04 20:01:34 +01:00

594 lines
22 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Test cases for JSON merger with different use cases and random cuts.
Tests the robustness of the JSON merger by:
1. Creating test JSON for different use cases
2. Cutting it randomly at various points
3. Running the merger for each piece
4. Checking completeness against original
"""
import json
import random
import logging
import sys
import os
from typing import Dict, Any, List, Tuple
# Add project root to Python path
# Find project root by looking for gateway/modules structure
currentFile = os.path.abspath(__file__)
currentDir = os.path.dirname(currentFile)
# Navigate up from: gateway/modules/services/serviceAi/test_json_merger.py
# To project root: D:\Athi\Local\Web\poweron
# Try different levels up
candidates = [
os.path.abspath(os.path.join(currentDir, '../../../../')), # From gateway/modules/services/serviceAi
os.path.abspath(os.path.join(currentDir, '../../..')), # Alternative
os.path.abspath(os.path.join(currentDir, '../..')), # Another alternative
]
projectRoot = None
for candidate in candidates:
gatewayModulesPath = os.path.join(candidate, 'gateway', 'modules')
if os.path.exists(gatewayModulesPath):
projectRoot = candidate
break
# If still not found, try to find by looking for gateway directory
if projectRoot is None:
searchDir = currentDir
for _ in range(10): # Max 10 levels up
gatewayPath = os.path.join(searchDir, 'gateway')
if os.path.exists(gatewayPath) and os.path.exists(os.path.join(gatewayPath, 'modules')):
projectRoot = searchDir
break
parent = os.path.dirname(searchDir)
if parent == searchDir: # Reached root
break
searchDir = parent
if projectRoot is None:
raise RuntimeError(f"Could not find project root. Current file: {currentFile}")
# Add gateway directory to Python path (not project root)
gatewayPath = os.path.join(projectRoot, 'gateway')
if gatewayPath not in sys.path:
sys.path.insert(0, gatewayPath)
# Verify the path is correct
modulesPath = os.path.join(projectRoot, 'gateway', 'modules')
if not os.path.exists(modulesPath):
raise RuntimeError(f"Project root verification failed. Expected gateway/modules at: {modulesPath}")
try:
from modules.services.serviceAi.subJsonResponseHandling import JsonResponseHandler
from modules.services.serviceAi.subJsonMerger import JsonMergeLogger
from modules.shared.jsonUtils import (
normalizeJsonText, stripCodeFences, closeJsonStructures, tryParseJson,
extractJsonStructureContext
)
except ImportError as e:
# Try to help debug
print(f"Import error: {e}")
print(f"Project root: {projectRoot}")
print(f"Gateway path: {gatewayPath}")
print(f"Python path (first 3): {sys.path[:3]}")
print(f"Looking for modules at: {modulesPath}")
print(f"Exists: {os.path.exists(modulesPath)}")
if os.path.exists(modulesPath):
print(f"Contents: {os.listdir(modulesPath)[:5]}")
raise
logger = logging.getLogger(__name__)
def createTestJsonForUseCase(useCaseId: str, size: int = 100) -> Dict[str, Any]:
"""
Create test JSON for a specific use case.
Args:
useCaseId: Use case ID (section_content, chapter_structure, etc.)
size: Size of test data (number of elements/rows/items)
Returns:
Test JSON dictionary
"""
if useCaseId == "section_content":
# Create table with rows
elements = [{
"type": "table",
"content": {
"headers": ["Year", "Value"],
"rows": [[str(1947 + i), str(10000 + i * 100)] for i in range(size)]
}
}]
return {"elements": elements}
elif useCaseId == "chapter_structure":
chapters = [{
"id": f"chapter_{i}",
"title": f"Chapter {i}",
"level": 1
} for i in range(size)]
return {"documents": [{"chapters": chapters}]}
elif useCaseId == "code_structure":
files = [{
"id": f"file_{i}",
"filename": f"file_{i}.py",
"fileType": "python",
"functions": [f"function_{i}_{j}" for j in range(5)]
} for i in range(size)]
return {"files": files}
elif useCaseId == "code_content":
files = [{
"id": f"file_{i}",
"content": f"# File {i}\ndef function_{i}():\n pass\n" * 10,
"functions": [{"name": f"function_{i}_{j}", "line": j * 3} for j in range(5)]
} for i in range(size)]
return {"files": files}
else:
raise ValueError(f"Unknown use case: {useCaseId}")
def cutJsonRandomly(jsonString: str, numCuts: int = 5, overlapSize: int = 100) -> List[str]:
"""
Cut JSON string RANDOMLY at different points WITH OVERLAP between fragments.
Each fragment overlaps with the previous one to help merging.
Args:
jsonString: JSON string to cut
numCuts: Number of cuts to make
overlapSize: Size of overlap between fragments (in characters)
Returns:
List of JSON fragments with overlap
"""
fragments = []
currentPos = 0
totalLength = len(jsonString)
if totalLength == 0:
return []
# First fragment: from start to first cut point
if numCuts > 0:
# First cut point (between 20% and 40% of total)
firstCutPoint = random.randint(int(totalLength * 0.2), int(totalLength * 0.4))
fragment = jsonString[:firstCutPoint]
fragments.append(fragment)
currentPos = firstCutPoint
else:
# No cuts - return whole string
return [jsonString]
# Subsequent fragments: each starts with overlap from previous, then continues
for i in range(numCuts - 1):
if currentPos >= totalLength:
break
# Calculate overlap start (go back overlapSize from current position)
overlapStart = max(0, currentPos - overlapSize)
# Calculate next cut point (between 20% and 40% of remaining)
remaining = totalLength - currentPos
if remaining < overlapSize * 2:
# Not enough remaining - add rest as last fragment
fragment = jsonString[overlapStart:]
fragments.append(fragment)
break
# Next cut point from current position
nextCutPoint = currentPos + random.randint(int(remaining * 0.2), int(remaining * 0.4))
nextCutPoint = min(nextCutPoint, totalLength)
# Fragment: from overlap start to next cut point
fragment = jsonString[overlapStart:nextCutPoint]
fragments.append(fragment)
currentPos = nextCutPoint
# Add remaining as last fragment (with overlap)
if currentPos < totalLength:
overlapStart = max(0, currentPos - overlapSize)
fragment = jsonString[overlapStart:]
fragments.append(fragment)
return fragments
def testMergerWithFragments(
originalJson: Dict[str, Any],
fragments: List[str],
useCaseId: str
) -> Tuple[bool, Dict[str, Any], str]:
"""
Test merger by merging fragments sequentially.
Args:
originalJson: Original complete JSON
fragments: List of JSON fragments to merge
useCaseId: Use case ID
Returns:
Tuple of (success, merged_json, error_message)
"""
if not fragments:
return False, {}, "No fragments provided"
# Log structure context for each fragment (especially incomplete ones)
print(f"\n{'='*60}")
print(f"FRAGMENT ANALYSIS (use case: {useCaseId})")
print(f"{'='*60}")
for fragIdx, fragment in enumerate(fragments):
print(f"\nFragment {fragIdx + 1}/{len(fragments)}:")
print(f" Length: {len(fragment)} chars")
# Extract structure context for this fragment
try:
structureContext = extractJsonStructureContext(fragment, useCaseId)
templateStructure = structureContext.get("template_structure", "")
lastCompletePart = structureContext.get("last_complete_part", "")
incompletePart = structureContext.get("incomplete_part", "")
structureContextJson = structureContext.get("structure_context", "")
# Check if fragment is incomplete
normalized = stripCodeFences(normalizeJsonText(fragment)).strip()
parsed, parseErr, _ = tryParseJson(normalized)
isIncomplete = parseErr is not None or (parsed is None)
if isIncomplete:
print(f" Status: INCOMPLETE (cut off)")
print(f" Template Structure:")
if templateStructure:
# Show first few lines of template
templateLines = templateStructure.split('\n')
templateLinesToShow = templateLines[:5]
for line in templateLinesToShow:
print(f" {line}")
if len(templateLines) > 5:
remainingLines = len(templateLines) - 5
print(f" ... ({remainingLines} more lines)")
else:
print(f" (not available)")
print(f" Structure Context:")
if structureContextJson:
# Show structure context
contextLines = structureContextJson.split('\n')
contextLinesToShow = contextLines[:5]
for line in contextLinesToShow:
print(f" {line}")
if len(contextLines) > 5:
remainingContextLines = len(contextLines) - 5
print(f" ... ({remainingContextLines} more lines)")
else:
print(f" (not available)")
print(f" Last Complete Part:")
if lastCompletePart:
# Show last complete part (truncated if too long)
if len(lastCompletePart) > 200:
print(f" {lastCompletePart[:200]}... ({len(lastCompletePart)} chars total)")
else:
print(f" {lastCompletePart}")
else:
print(f" (not available)")
print(f" Incomplete Part:")
if incompletePart:
# Show incomplete part (truncated if too long)
if len(incompletePart) > 200:
print(f" {incompletePart[:200]}... ({len(incompletePart)} chars total)")
else:
print(f" {incompletePart}")
else:
print(f" (not available)")
else:
print(f" Status: COMPLETE")
if structureContextJson:
print(f" Structure Context:")
contextLines = structureContextJson.split('\n')
contextLinesToShow = contextLines[:3]
for line in contextLinesToShow:
print(f" {line}")
if len(contextLines) > 3:
remainingContextLines = len(contextLines) - 3
print(f" ... ({remainingContextLines} more lines)")
except Exception as e:
print(f" Error extracting structure context: {e}")
print(f"\n{'='*60}\n")
# Start with first fragment
accumulated = fragments[0]
# Merge each subsequent fragment
for i, fragment in enumerate(fragments[1:], 1):
try:
accumulated, hasOverlap = JsonResponseHandler.mergeJsonStringsWithOverlap(
accumulated, fragment
)
# Log if no overlap was found (iterations would stop in real scenario)
if not hasOverlap:
print(f" ⚠️ Fragment {i}: No overlap found - iterations would stop here")
# Check if result is empty (should never happen)
if not accumulated or accumulated.strip() in ['{"elements": []}', '{}', '']:
return False, {}, f"Merge {i} returned empty JSON"
except Exception as e:
return False, {}, f"Merge {i} failed with error: {str(e)}"
# Parse merged result
try:
# Normalize and try to parse
normalized = stripCodeFences(normalizeJsonText(accumulated)).strip()
# Try to parse directly
parsed, parseErr, _ = tryParseJson(normalized)
if parseErr is not None:
# Try closing structures if incomplete
try:
closed = closeJsonStructures(normalized)
parsed, parseErr2, _ = tryParseJson(closed)
if parseErr2 is not None:
# Try to extract valid JSON prefix
# JsonResponseHandler is already imported at module level
validPrefix = JsonResponseHandler._extractValidJsonPrefix(normalized)
if validPrefix:
parsed, parseErr3, _ = tryParseJson(validPrefix)
if parseErr3 is not None:
return False, {}, f"Final parse error: {str(parseErr3)}"
else:
return False, {}, f"Final parse error: {str(parseErr2)}"
except Exception as parseErr:
return False, {}, f"Final parse error: {str(parseErr)}"
if not parsed:
return False, {}, "Final parse returned None"
# CRITICAL: Ensure parsed is a dict, not a list
# If it's a list, wrap it in the expected structure based on use case
if isinstance(parsed, list):
# Try to normalize list to expected structure
if useCaseId == "section_content":
# List of elements - wrap in elements structure
parsed = {"elements": parsed}
elif useCaseId == "chapter_structure":
# List of chapters - wrap in documents structure
parsed = {"documents": [{"chapters": parsed}]}
elif useCaseId == "code_structure":
# List of files - wrap in files structure
parsed = {"files": parsed}
elif useCaseId == "code_content":
# List of files - wrap in files structure
parsed = {"files": parsed}
else:
# Unknown use case - try to wrap as elements
parsed = {"elements": parsed}
# Ensure it's a dict now
if not isinstance(parsed, dict):
return False, {}, f"Final parse returned unexpected type: {type(parsed).__name__}"
return True, parsed, ""
except Exception as e:
return False, {}, f"Final parse failed: {str(e)}"
def compareJsonCompleteness(
original: Dict[str, Any],
merged: Dict[str, Any],
useCaseId: str
) -> Tuple[bool, str]:
"""
Compare merged JSON with original to check completeness.
Args:
original: Original JSON
merged: Merged JSON (must be a dict)
useCaseId: Use case ID
Returns:
Tuple of (is_complete, message)
"""
# CRITICAL: Ensure merged is a dict
if not isinstance(merged, dict):
return False, f"Merged JSON is not a dict, got {type(merged).__name__}"
if useCaseId == "section_content":
origElements = original.get("elements", [])
mergedElements = merged.get("elements", [])
if not isinstance(origElements, list):
return False, f"Original elements is not a list: {type(origElements).__name__}"
if not isinstance(mergedElements, list):
return False, f"Merged elements is not a list: {type(mergedElements).__name__}"
if len(mergedElements) < len(origElements):
return False, f"Missing elements: {len(origElements)} expected, {len(mergedElements)} found"
# Check table rows
if origElements and mergedElements:
origTable = origElements[0] if isinstance(origElements[0], dict) else {}
mergedTable = mergedElements[0] if isinstance(mergedElements[0], dict) else {}
if not origTable or not mergedTable:
return False, f"Table structure missing: origTable={bool(origTable)}, mergedTable={bool(mergedTable)}"
origRows = origTable.get("content", {}).get("rows", []) if isinstance(origTable.get("content"), dict) else origTable.get("rows", [])
mergedRows = mergedTable.get("content", {}).get("rows", []) if isinstance(mergedTable.get("content"), dict) else mergedTable.get("rows", [])
if not isinstance(origRows, list):
return False, f"Original rows is not a list: {type(origRows).__name__}"
if not isinstance(mergedRows, list):
return False, f"Merged rows is not a list: {type(mergedRows).__name__}"
if len(mergedRows) < len(origRows):
return False, f"Missing rows: {len(origRows)} expected, {len(mergedRows)} found"
return True, "Complete"
elif useCaseId == "chapter_structure":
origChapters = original.get("documents", [{}])[0].get("chapters", [])
mergedChapters = merged.get("documents", [{}])[0].get("chapters", [])
if len(mergedChapters) < len(origChapters):
return False, f"Missing chapters: {len(origChapters)} expected, {len(mergedChapters)} found"
return True, "Complete"
elif useCaseId == "code_structure":
origFiles = original.get("files", [])
mergedFiles = merged.get("files", [])
if len(mergedFiles) < len(origFiles):
return False, f"Missing files: {len(origFiles)} expected, {len(mergedFiles)} found"
return True, "Complete"
elif useCaseId == "code_content":
origFiles = original.get("files", [])
mergedFiles = merged.get("files", [])
if len(mergedFiles) < len(origFiles):
return False, f"Missing files: {len(origFiles)} expected, {len(mergedFiles)} found"
return True, "Complete"
else:
return False, f"Unknown use case: {useCaseId}"
def runTestForUseCase(useCaseId: str, size: int = 50, numTests: int = 10) -> Dict[str, Any]:
"""
Run multiple tests for a use case with random cuts.
Args:
useCaseId: Use case ID
size: Size of test data
numTests: Number of test runs
Returns:
Test results dictionary
"""
results = {
"useCaseId": useCaseId,
"size": size,
"numTests": numTests,
"passed": 0,
"failed": 0,
"errors": []
}
for testNum in range(numTests):
try:
# Create test JSON
originalJson = createTestJsonForUseCase(useCaseId, size)
originalString = json.dumps(originalJson, indent=2, ensure_ascii=False)
# Cut randomly
fragments = cutJsonRandomly(originalString, numCuts=random.randint(3, 7))
# Test merger
success, mergedJson, errorMsg = testMergerWithFragments(
originalJson, fragments, useCaseId
)
if not success:
results["failed"] += 1
results["errors"].append(f"Test {testNum + 1}: {errorMsg}")
continue
# Check completeness
isComplete, completenessMsg = compareJsonCompleteness(
originalJson, mergedJson, useCaseId
)
if isComplete:
results["passed"] += 1
else:
results["failed"] += 1
results["errors"].append(f"Test {testNum + 1}: {completenessMsg}")
except Exception as e:
results["failed"] += 1
results["errors"].append(f"Test {testNum + 1}: Exception - {str(e)}")
return results
def runAllTests():
"""Run tests for all use cases."""
useCases = [
"section_content",
"chapter_structure",
"code_structure",
"code_content"
]
allResults = []
for useCaseId in useCases:
print(f"\n{'='*60}")
print(f"Testing use case: {useCaseId}")
print(f"{'='*60}")
# Initialize log file for this use case
# Initialize log file (overwrite on each test run)
logFileName = f"json_merger_{useCaseId}.txt"
JsonMergeLogger.initializeLogFile(logFileName)
print(f"Log file: {logFileName}")
results = runTestForUseCase(useCaseId, size=50, numTests=10)
allResults.append(results)
print(f"Passed: {results['passed']}/{results['numTests']}")
print(f"Failed: {results['failed']}/{results['numTests']}")
if results["errors"]:
print("\nErrors:")
for error in results["errors"][:5]: # Show first 5 errors
print(f" - {error}")
# Summary
print(f"\n{'='*60}")
print("SUMMARY")
print(f"{'='*60}")
totalPassed = sum(r["passed"] for r in allResults)
totalFailed = sum(r["failed"] for r in allResults)
totalTests = sum(r["numTests"] for r in allResults)
print(f"Total tests: {totalTests}")
print(f"Passed: {totalPassed}")
print(f"Failed: {totalFailed}")
print(f"Success rate: {totalPassed / totalTests * 100:.1f}%")
return allResults
if __name__ == "__main__":
# Set up logging - use WARNING level to reduce noise from jsonUtils
logging.basicConfig(level=logging.WARNING)
# Run tests
results = runAllTests()
# Save results to file (in project root)
resultsFile = os.path.join(projectRoot, "test_json_merger_results.json")
with open(resultsFile, "w", encoding="utf-8") as f:
json.dump(results, f, indent=2, ensure_ascii=False)
print(f"\nResults saved to {resultsFile}")