gateway/tests/functional/test12_json_split_merge.py
2026-01-23 01:10:00 +01:00

804 lines
33 KiB
Python

#!/usr/bin/env python3
# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
JSON Split and Merge Test 12 - Tests JSON splitting and merging using workflow tools
Tests random splitting of JSON files into 3 parts and merging them back using ModularJsonMerger.
"""
import asyncio
import json
import sys
import os
import time
import random
from typing import Dict, Any, List, Optional, Tuple
# Add the gateway to path (go up 2 levels from tests/functional/)
_gateway_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))
if _gateway_path not in sys.path:
sys.path.insert(0, _gateway_path)
# Import JSON merger from workflow tools
from modules.services.serviceAi.subJsonMerger import ModularJsonMerger, JsonMergeLogger
from modules.shared.jsonContinuation import getContexts
class JsonSplitMergeTester12:
def __init__(self):
self.testResults = {}
self.testJsonFiles = []
self.logBuffer = []
self.logFile = None
def createTestJsonFiles(self) -> List[Dict[str, Any]]:
"""Create various test JSON files with different structures."""
testFiles = [
{
"name": "config.json",
"data": {
"application": "Customer Manager",
"version": "1.0.0",
"database": {
"host": "localhost",
"port": 5432,
"name": "customers_db"
},
"api": {
"baseUrl": "https://api.example.com",
"timeout": 30
}
}
},
{
"name": "customers.json",
"data": {
"customers": [
{"id": 1, "name": "John Doe", "email": "john@example.com", "phone": "+1234567890", "address": "123 Main St"},
{"id": 2, "name": "Jane Smith", "email": "jane@example.com", "phone": "+0987654321", "address": "456 Oak Ave"},
{"id": 3, "name": "Bob Johnson", "email": "bob@example.com", "phone": "+1122334455", "address": "789 Pine Rd"},
{"id": 4, "name": "Alice Williams", "email": "alice@example.com", "phone": "+5566778899", "address": "321 Elm St"},
{"id": 5, "name": "Charlie Brown", "email": "charlie@example.com", "phone": "+9988776655", "address": "654 Maple Dr"}
]
}
},
{
"name": "settings.json",
"data": {
"theme": {
"darkMode": True,
"fontSize": 14,
"language": "en"
},
"notifications": {
"email": True,
"sms": False,
"push": True
},
"features": {
"enableAnalytics": True,
"enableReports": False
}
}
},
{
"name": "products.json",
"data": {
"products": [
{"id": "P001", "name": "Product A", "price": 29.99, "category": "Electronics", "inStock": True},
{"id": "P002", "name": "Product B", "price": 49.99, "category": "Clothing", "inStock": True},
{"id": "P003", "name": "Product C", "price": 19.99, "category": "Books", "inStock": False},
{"id": "P004", "name": "Product D", "price": 99.99, "category": "Electronics", "inStock": True},
{"id": "P005", "name": "Product E", "price": 14.99, "category": "Books", "inStock": True},
{"id": "P006", "name": "Product F", "price": 79.99, "category": "Clothing", "inStock": True}
]
}
},
{
"name": "document_structure.json",
"data": {
"metadata": {
"title": "Test Document",
"author": "Test Author",
"date": "2025-01-05"
},
"documents": [
{
"id": "doc1",
"title": "Document 1",
"sections": [
{
"id": "sec1",
"content_type": "heading",
"elements": [
{"type": "heading", "content": {"text": "Introduction", "level": 1}}
]
},
{
"id": "sec2",
"content_type": "paragraph",
"elements": [
{"type": "paragraph", "content": {"text": "This is a test paragraph."}}
]
}
]
}
]
}
},
{
"name": "table_example.json",
"data": self._loadTableJsonExample()
},
{
"name": "complete_json.json",
"data": {
"status": "complete",
"message": "This is a complete, valid JSON object",
"data": {
"items": [1, 2, 3, 4, 5],
"metadata": {
"version": "1.0",
"timestamp": "2025-01-05T12:00:00Z"
}
}
},
"isComplete": True # Flag to indicate this is complete JSON (not cut)
},
{
"name": "json_with_comments.json",
"data": None, # Will be set as string with comments
"jsonString": '''{
// This is a single-line comment
"name": "Test",
"value": 42,
/* This is a multi-line comment
spanning multiple lines */
"items": [1, 2, 3],
"nested": {
// Another comment
"key": "value"
}
}''',
"hasComments": True
},
{
"name": "json_with_trailing_comma.json",
"data": None, # Will be set as string with trailing comma
"jsonString": '''{
"name": "Test",
"value": 42,
"items": [1, 2, 3,],
"nested": {
"key": "value",
}
}''',
"hasTrailingComma": True
},
{
"name": "json_with_unquoted_keys.json",
"data": None, # Will be set as string with unquoted keys
"jsonString": '''{
name: "Test",
value: 42,
items: [1, 2, 3],
nested: {
key: "value"
}
}''',
"hasUnquotedKeys": True
},
{
"name": "json_with_invalid_escape.json",
"data": None, # Will be set as string with invalid escape
"jsonString": '''{
"name": "Test\\xInvalid",
"value": 42,
"description": "This has \\u invalid escape"
}''',
"hasInvalidEscape": True
},
{
"name": "json_mixed_errors.json",
"data": None, # Will be set as string with multiple errors
"jsonString": '''{
// Comment here
name: "Test", // Unquoted key
"value": 42,
"items": [1, 2, 3,], // Trailing comma
"description": "Has \\x invalid escape",
"nested": {
key: "value", // Unquoted key and trailing comma
}
}''',
"hasMixedErrors": True
}
]
return testFiles
def _loadTableJsonExample(self) -> Dict[str, Any]:
"""Load the table JSON example from the debug prompts file."""
try:
# Import jsonUtils for closing incomplete JSON structures
from modules.shared.jsonUtils import closeJsonStructures, tryParseJson
# Path to the JSON example file
jsonExamplePath = os.path.join(
os.path.dirname(__file__), "..", "..", "..", "local", "debug", "prompts",
"20260105-214826-020-chapter_1_section_section_2_response_iteration_2.txt"
)
# Read the file content
with open(jsonExamplePath, 'r', encoding='utf-8') as f:
content = f.read()
# Remove markdown code block markers
jsonContent = content.strip()
if jsonContent.startswith('```json'):
jsonContent = jsonContent[7:] # Remove ```json
if jsonContent.startswith('```'):
jsonContent = jsonContent[3:] # Remove ```
jsonContent = jsonContent.strip()
if jsonContent.endswith('```'):
jsonContent = jsonContent[:-3] # Remove trailing ```
jsonContent = jsonContent.strip()
# The JSON is incomplete - use closeJsonStructures to complete it
closedJson = closeJsonStructures(jsonContent)
# Parse the closed JSON
parsedJson, error, _ = tryParseJson(closedJson)
if error is None and parsedJson is not None:
return parsedJson
else:
raise Exception(f"Failed to parse JSON after closing structures: {error}")
except Exception as e:
# If loading fails, return a minimal valid structure
print(f"Warning: Could not load table JSON example: {e}")
return {
"elements": [
{
"type": "table",
"content": {
"headers": ["Spalte1", "Spalte2", "Spalte3"],
"rows": [
[36761, 36767, 36779]
]
}
}
]
}
def splitJsonRandomly(self, jsonString: str, numParts: int = 3) -> List[str]:
"""
Split JSON string randomly into specified number of parts.
Simulates real AI response cuts - can split anywhere, even in the middle of strings/numbers/structures.
This is the REAL scenario: AI response gets cut off randomly, not at convenient points.
"""
if numParts < 2:
return [jsonString]
jsonLength = len(jsonString)
# Generate truly random split points - can be anywhere!
# Only ensure minimum part size to avoid empty parts
minPartSize = max(10, jsonLength // (numParts * 3)) # Smaller minimum to allow more randomness
splitPoints = []
for _ in range(numParts - 1):
# Generate random point - can be anywhere in the string
# Only ensure we don't create parts smaller than minimum
minPoint = len(splitPoints) * minPartSize if splitPoints else minPartSize
maxPoint = jsonLength - (numParts - len(splitPoints) - 1) * minPartSize
if maxPoint <= minPoint:
# If we can't avoid minimum size, just use the boundary
splitPoint = minPoint
else:
# Truly random point - can be in the middle of anything!
splitPoint = random.randint(minPoint, maxPoint)
splitPoints.append(splitPoint)
splitPoints.sort()
# Create parts - these can be cut anywhere, even mid-string, mid-number, etc.
parts = []
start = 0
for splitPoint in splitPoints:
parts.append(jsonString[start:splitPoint])
start = splitPoint
parts.append(jsonString[start:]) # Last part
return parts
def _log(self, message: str):
"""Add message to log buffer."""
self.logBuffer.append(message)
print(message)
def normalizeJson(self, jsonString: str) -> Optional[Dict[str, Any]]:
"""Normalize JSON string by parsing and re-serializing. Returns None if parsing fails."""
try:
parsed = json.loads(jsonString)
return parsed
except json.JSONDecodeError:
# Try to close incomplete JSON structures
try:
from modules.shared.jsonUtils import closeJsonStructures, tryParseJson
closed = closeJsonStructures(jsonString)
parsed, error, _ = tryParseJson(closed)
if error is None and parsed is not None:
return parsed
except Exception:
pass
# Return None if all parsing attempts fail
return None
def compareJson(self, original: Dict[str, Any], merged: Dict[str, Any]) -> Dict[str, Any]:
"""Compare original and merged JSON structures."""
originalStr = json.dumps(original, sort_keys=True, indent=2)
mergedStr = json.dumps(merged, sort_keys=True, indent=2)
exactMatch = originalStr == mergedStr
# Deep comparison
differences = []
self._findDifferences(original, merged, "", differences)
return {
"exactMatch": exactMatch,
"differences": differences,
"originalSize": len(originalStr),
"mergedSize": len(mergedStr),
"sizeMatch": len(originalStr) == len(mergedStr)
}
def _findDifferences(self, obj1: Any, obj2: Any, path: str, differences: List[str]):
"""Recursively find differences between two JSON objects."""
if type(obj1) != type(obj2):
differences.append(f"{path}: Type mismatch - {type(obj1).__name__} vs {type(obj2).__name__}")
return
if isinstance(obj1, dict):
allKeys = set(obj1.keys()) | set(obj2.keys())
for key in allKeys:
newPath = f"{path}.{key}" if path else key
if key not in obj1:
differences.append(f"{newPath}: Missing in original")
elif key not in obj2:
differences.append(f"{newPath}: Missing in merged")
else:
self._findDifferences(obj1[key], obj2[key], newPath, differences)
elif isinstance(obj1, list):
if len(obj1) != len(obj2):
differences.append(f"{path}: Length mismatch - {len(obj1)} vs {len(obj2)}")
else:
for i, (item1, item2) in enumerate(zip(obj1, obj2)):
newPath = f"{path}[{i}]"
self._findDifferences(item1, item2, newPath, differences)
else:
if obj1 != obj2:
differences.append(f"{path}: Value mismatch - {obj1} vs {obj2}")
async def testJsonSplitMerge(self, jsonFile: Dict[str, Any]) -> Dict[str, Any]:
"""Test splitting and merging a single JSON file."""
fileName = jsonFile["name"]
# Check if this is a complete JSON test (no cut)
isComplete = jsonFile.get("isComplete", False)
# Check if this is a JSON string with errors (not from data dict)
jsonString = jsonFile.get("jsonString")
if jsonString:
# Use the provided JSON string directly (may have errors)
originalJsonString = jsonString
originalData = None # No original data for error tests
else:
# Convert data dict to JSON string
originalData = jsonFile["data"]
originalJsonString = json.dumps(originalData, indent=2, ensure_ascii=False)
originalSize = len(originalJsonString)
self._log("")
self._log("="*80)
testType = "COMPLETE JSON" if isComplete else ("JSON WITH ERRORS" if jsonString else "SPLIT JSON")
self._log(f"TESTING {testType}: {fileName}")
self._log("="*80)
# Log original JSON
self._log("")
self._log("="*80)
self._log("ORIGINAL JSON")
self._log("="*80)
self._log(f"JSON length: {originalSize} characters")
if isComplete:
self._log(" ⚠️ This is COMPLETE JSON (not cut) - testing overlapContext='' detection")
if jsonString:
errorType = []
if jsonFile.get("hasComments"):
errorType.append("comments")
if jsonFile.get("hasTrailingComma"):
errorType.append("trailing commas")
if jsonFile.get("hasUnquotedKeys"):
errorType.append("unquoted keys")
if jsonFile.get("hasInvalidEscape"):
errorType.append("invalid escapes")
if jsonFile.get("hasMixedErrors"):
errorType.append("mixed errors")
if errorType:
self._log(f" ⚠️ This JSON has errors: {', '.join(errorType)} - testing repair function")
self._log("")
self._log("Full JSON content:")
self._log("-"*80)
jsonLines = originalJsonString.split('\n')
if len(jsonLines) > 50:
for line in jsonLines[:25]:
self._log(line)
self._log(f"... ({len(jsonLines) - 50} lines omitted) ...")
for line in jsonLines[-25:]:
self._log(line)
else:
for line in jsonLines:
self._log(line)
# Handle complete JSON, JSON with errors, vs split JSON
if isComplete or jsonString:
# For complete JSON or JSON with errors, use the full string (no cut)
# We want to test repair on the full error-containing JSON
partContent = originalJsonString
cutPosition = None # No cut
self._log("")
self._log("="*80)
if isComplete:
self._log("COMPLETE JSON TEST (NO CUT)")
self._log("="*80)
self._log(" Testing that getContexts() detects complete JSON and sets overlapContext=''")
else:
self._log("JSON WITH ERRORS TEST (NO CUT)")
self._log("="*80)
self._log(" Testing that getContexts() repairs the errors and produces valid JSON")
else:
# Split JSON at random position (simulating AI response cut)
self._log("")
self._log("="*80)
self._log("SPLITTING JSON AT RANDOM POSITION (SIMULATING AI RESPONSE CUT)")
self._log("="*80)
# Find random cut position (not at start or end)
import random
minCutPos = max(100, originalSize // 10) # At least 10% from start
maxCutPos = min(originalSize - 100, originalSize * 9 // 10) # At least 10% from end
# Ensure valid range
if maxCutPos <= minCutPos:
# For small JSON, just cut in the middle
cutPosition = originalSize // 2
else:
cutPosition = random.randint(minCutPos, maxCutPos)
# Get part from start to cut
partContent = originalJsonString[:cutPosition]
if not isComplete:
self._log("")
self._log("="*80)
self._log("PART (from start to cut):")
self._log("="*80)
self._log(f"Cut position: {cutPosition} characters")
self._log(f"Part length: {len(partContent)} characters")
self._log("")
self._log("Part content:")
partLines = partContent.split('\n')
if len(partLines) > 30:
for line in partLines[:15]:
self._log(f" {line}")
self._log(f" ... ({len(partLines) - 30} lines omitted) ...")
for line in partLines[-15:]:
self._log(f" {line}")
else:
for line in partLines:
self._log(f" {line}")
# Generate contexts using getContexts()
self._log("")
self._log("="*80)
self._log("GENERATING CONTINUATION CONTEXTS")
self._log("="*80)
contexts = getContexts(partContent)
# Log overlap context
self._log("")
self._log("="*80)
self._log("OVERLAP CONTEXT (for merging):")
self._log("="*80)
overlapLines = contexts.overlapContext.split('\n')
if len(overlapLines) > 30:
for line in overlapLines[:15]:
self._log(f" {line}")
self._log(f" ... ({len(overlapLines) - 30} lines omitted) ...")
for line in overlapLines[-15:]:
self._log(f" {line}")
else:
for line in overlapLines:
self._log(f" {line}")
# Log hierarchy context (full, without budget)
self._log("")
self._log("="*80)
self._log("HIERARCHY CONTEXT (full structure, no budget):")
self._log("="*80)
hierarchyLines = contexts.hierarchyContext.split('\n')
if len(hierarchyLines) > 30:
for line in hierarchyLines[:15]:
self._log(f" {line}")
self._log(f" ... ({len(hierarchyLines) - 30} lines omitted) ...")
for line in hierarchyLines[-15:]:
self._log(f" {line}")
else:
for line in hierarchyLines:
self._log(f" {line}")
# Log hierarchy context for prompt (with budget)
self._log("")
self._log("="*80)
self._log("HIERARCHY CONTEXT FOR PROMPT (with budget logic):")
self._log("="*80)
hierarchyPromptLines = contexts.hierarchyContextForPrompt.split('\n')
for line in hierarchyPromptLines:
self._log(f" {line}")
# Test completePart as valid JSON
self._log("")
self._log("="*80)
self._log("COMPLETE PART (should be valid JSON):")
self._log("="*80)
completeLines = contexts.completePart.split('\n')
if len(completeLines) > 30:
for line in completeLines[:15]:
self._log(f" {line}")
self._log(f" ... ({len(completeLines) - 30} lines omitted) ...")
for line in completeLines[-15:]:
self._log(f" {line}")
else:
for line in completeLines:
self._log(f" {line}")
# Validate completePart as JSON and check overlapContext
self._log("")
self._log("="*80)
self._log("VALIDATION RESULTS:")
self._log("="*80)
# Check overlapContext for complete JSON
if isComplete:
if contexts.overlapContext == "":
self._log(" ✅ overlapContext is empty (correct for complete JSON)")
else:
self._log(f" ❌ overlapContext is NOT empty: '{contexts.overlapContext[:50]}...'")
self._log(" Expected empty string for complete JSON")
# Validate completePart as JSON
self._log("")
self._log("VALIDATING COMPLETE PART AS JSON:")
isValidJson = False
parsedCompletePart = None
jsonError = None
try:
parsedCompletePart = json.loads(contexts.completePart)
isValidJson = True
self._log(" ✅ completePart is valid JSON")
self._log(f" Parsed type: {type(parsedCompletePart).__name__}")
# For error tests, verify repair worked
if jsonString:
self._log(" ✅ JSON repair successful - errors were fixed")
# For split JSON, compare with truncated JSON
if not isComplete and not jsonString:
# Compare with truncated JSON (not original) - parse the truncated part to compare
from modules.shared.jsonUtils import closeJsonStructures, tryParseJson
# Try to parse the truncated JSON part (with structures closed)
truncatedClosed = closeJsonStructures(partContent)
truncatedParsed, truncatedError, _ = tryParseJson(truncatedClosed)
if truncatedParsed is not None:
# Compare completePart with the parsed truncated JSON
if isinstance(parsedCompletePart, dict) and isinstance(truncatedParsed, dict):
comparison = self.compareJson(truncatedParsed, parsedCompletePart)
self._log(f" Comparison with truncated JSON (at cut position {cutPosition}):")
self._log(f" Exact match: {comparison['exactMatch']}")
self._log(f" Size match: {comparison['sizeMatch']}")
if comparison['differences']:
self._log(f" Differences found: {len(comparison['differences'])}")
for diff in comparison['differences'][:10]: # Show first 10 differences
self._log(f" - {diff}")
if len(comparison['differences']) > 10:
self._log(f" ... ({len(comparison['differences']) - 10} more differences)")
else:
self._log(" No differences found - completePart matches truncated JSON structure")
elif isinstance(parsedCompletePart, list) and isinstance(truncatedParsed, list):
self._log(f" Both are lists: truncated={len(truncatedParsed)} items, completePart={len(parsedCompletePart)} items")
else:
self._log(f" Different types: truncated={type(truncatedParsed).__name__}, completePart={type(parsedCompletePart).__name__}")
else:
self._log(f" Could not parse truncated JSON for comparison (error: {truncatedError})")
except json.JSONDecodeError as e:
isValidJson = False
jsonError = str(e)
self._log(f" ❌ completePart is NOT valid JSON")
self._log(f" Error: {jsonError}")
self._log(f" Error position: line {e.lineno}, column {e.colno}")
if jsonString:
self._log(" ❌ JSON repair FAILED - errors were not fixed")
# Return test results
result = {
"success": isValidJson,
"fileName": fileName,
"originalSize": originalSize,
"cutPosition": cutPosition if not isComplete else None,
"partSize": len(partContent),
"overlapContextSize": len(contexts.overlapContext),
"hierarchyContextSize": len(contexts.hierarchyContext),
"hierarchyContextForPromptSize": len(contexts.hierarchyContextForPrompt),
"completePartSize": len(contexts.completePart),
"isValidJson": isValidJson,
"jsonError": jsonError,
"parsedCompletePart": parsedCompletePart is not None,
"jsonParsingSuccess": contexts.jsonParsingSuccess
}
# Add complete JSON specific checks
if isComplete:
result["overlapContextIsEmpty"] = contexts.overlapContext == ""
result["isComplete"] = True
# For complete JSON, success means overlapContext is empty AND valid JSON
result["success"] = isValidJson and (contexts.overlapContext == "")
# Add error test specific checks
if jsonString:
result["hasErrors"] = True
result["repairSuccess"] = isValidJson
return result
async def testAllJsonFiles(self) -> Dict[str, Any]:
"""Test splitting and merging all test JSON files."""
print("\n" + "="*80)
print("TESTING JSON SPLIT AND MERGE")
print("="*80)
testFiles = self.createTestJsonFiles()
results = {}
for jsonFile in testFiles:
try:
result = await self.testJsonSplitMerge(jsonFile)
results[jsonFile["name"]] = result
# Small delay between tests
await asyncio.sleep(0.5)
except Exception as e:
import traceback
print(f"\n❌ Error testing {jsonFile['name']}: {str(e)}")
print(traceback.format_exc())
results[jsonFile["name"]] = {
"success": False,
"error": str(e),
"traceback": traceback.format_exc()
}
return results
def _writeLogFile(self):
"""Write log buffer to file."""
logDir = os.path.join(os.path.dirname(__file__), "..", "..", "..", "local", "debug")
os.makedirs(logDir, exist_ok=True)
logFilePath = os.path.join(logDir, "test12_json_split_merge_results.txt")
with open(logFilePath, 'w', encoding='utf-8') as f:
f.write('\n'.join(self.logBuffer))
self.logFile = logFilePath
print(f"\n📝 Detailed log written to: {logFilePath}")
async def runTest(self):
"""Run the complete test."""
self._log("="*80)
self._log("JSON SPLIT AND MERGE TEST 12")
self._log("="*80)
try:
# Test all JSON files
results = await self.testAllJsonFiles()
# Write log file
self._writeLogFile()
# Summary
print("\n" + "="*80)
print("TEST SUMMARY")
print("="*80)
successCount = 0
for fileName, result in results.items():
if result.get("success"):
successCount += 1
isValidJson = result.get("isValidJson", False)
isComplete = result.get("isComplete", False)
hasErrors = result.get("hasErrors", False)
if isComplete:
overlapEmpty = result.get("overlapContextIsEmpty", False)
if isValidJson and overlapEmpty:
print(f"{fileName:30s}: Complete JSON - overlapContext='' and valid JSON")
elif not overlapEmpty:
print(f"⚠️ {fileName:30s}: Complete JSON but overlapContext not empty")
else:
jsonError = result.get("jsonError", "Unknown error")
print(f"⚠️ {fileName:30s}: Complete JSON but not valid - {jsonError}")
elif hasErrors:
repairSuccess = result.get("repairSuccess", False)
if repairSuccess:
print(f"{fileName:30s}: JSON with errors - repair successful")
else:
jsonError = result.get("jsonError", "Unknown error")
print(f"{fileName:30s}: JSON with errors - repair failed - {jsonError}")
else:
if isValidJson:
print(f"{fileName:30s}: Valid JSON - completePart parsed successfully")
else:
jsonError = result.get("jsonError", "Unknown error")
print(f"⚠️ {fileName:30s}: Contexts generated but completePart is not valid JSON - {jsonError}")
else:
error = result.get("error", "Unknown error")
print(f"{fileName:30s}: FAILED - {error}")
print(f"\nResults: {successCount}/{len(results)} successful")
self.testResults = {
"success": successCount == len(results),
"totalFiles": len(results),
"successCount": successCount,
"results": results
}
return self.testResults
except Exception as e:
import traceback
print(f"\n❌ Test failed with error: {type(e).__name__}: {str(e)}")
print(f"Traceback:\n{traceback.format_exc()}")
self.testResults = {
"success": False,
"error": str(e),
"traceback": traceback.format_exc()
}
return self.testResults
async def main():
"""Run JSON split and merge test 12."""
tester = JsonSplitMergeTester12()
results = await tester.runTest()
# Print final results as JSON for easy parsing
print("\n" + "="*80)
print("FINAL RESULTS (JSON)")
print("="*80)
print(json.dumps(results, indent=2, default=str))
if __name__ == "__main__":
asyncio.run(main())