tested json cut and merge handler for all scenarioes

This commit is contained in:
ValueOn AG 2026-01-05 21:49:40 +01:00
parent d747054976
commit 16ebf1b152
3 changed files with 419 additions and 18 deletions

View file

@ -327,12 +327,14 @@ class JsonAnalyzer:
This creates valid JSON by closing all open strings, brackets/braces.
Unvollständige Keys werden entfernt, damit das Ergebnis valides JSON ist.
Unvollständige Keywords (true, false, null) werden vervollständigt.
Strategy:
1. Take the full truncated JSON
2. If we're in the middle of a string, close it
3. Remove incomplete key-value pairs (keys without values)
4. Close all open brackets/braces
3. Complete incomplete keywords (tr true, f false, n null)
4. Remove incomplete key-value pairs (keys without values)
5. Close all open brackets/braces
"""
result = self.jsonStr.rstrip()
@ -344,6 +346,9 @@ class JsonAnalyzer:
stringClosing = self._getStringClosing(result)
result += stringClosing
# Complete incomplete keywords (true, false, null)
result = self._completeIncompleteKeywords(result)
# Check if we're in the middle of a key (after colon)
# If string was just closed and we're after a colon with no value, remove the key
result = self._cleanIncompleteKeyValue(result)
@ -398,6 +403,85 @@ class JsonAnalyzer:
return jsonStr
def _completeIncompleteKeywords(self, jsonStr: str) -> str:
"""
Complete incomplete JSON keywords at the end of the string.
Checks the last element for incomplete keywords after colon:
- ": t*" or ": f*" or ": n*" -> complete to true/false/null
- ": " or ":" (without keyword) -> set to null
"""
result = jsonStr.rstrip()
# Find the last colon (not in string)
in_string = False
escaped = False
last_colon_pos = -1
for i in range(len(result) - 1, -1, -1):
char = result[i]
if escaped:
escaped = False
continue
if char == '\\' and in_string:
escaped = True
continue
if char == '"':
in_string = not in_string
continue
if not in_string and char == ':':
last_colon_pos = i
break
if last_colon_pos < 0:
return result
# Get text after the last colon
after_colon = result[last_colon_pos + 1:].strip()
# Check for incomplete keyword patterns
if after_colon.startswith('t') or after_colon.startswith('T'):
# Incomplete true
keyword_start = last_colon_pos + 1
# Skip whitespace
while keyword_start < len(result) and result[keyword_start] in ' \t\n\r':
keyword_start += 1
# Remove partial keyword
keyword_end = keyword_start + 1
while keyword_end < len(result) and result[keyword_end].isalpha():
keyword_end += 1
return result[:keyword_start] + 'true' + result[keyword_end:]
elif after_colon.startswith('f') or after_colon.startswith('F'):
# Incomplete false
keyword_start = last_colon_pos + 1
while keyword_start < len(result) and result[keyword_start] in ' \t\n\r':
keyword_start += 1
keyword_end = keyword_start + 1
while keyword_end < len(result) and result[keyword_end].isalpha():
keyword_end += 1
return result[:keyword_start] + 'false' + result[keyword_end:]
elif after_colon.startswith('n') or after_colon.startswith('N'):
# Incomplete null
keyword_start = last_colon_pos + 1
while keyword_start < len(result) and result[keyword_start] in ' \t\n\r':
keyword_start += 1
keyword_end = keyword_start + 1
while keyword_end < len(result) and result[keyword_end].isalpha():
keyword_end += 1
return result[:keyword_start] + 'null' + result[keyword_end:]
elif not after_colon or after_colon == '':
# No keyword after colon -> set to null
return result + 'null'
return result
def _isIncompleteKey(self, jsonStr: str) -> bool:
"""
Check if the last string in the JSON is an incomplete key in an object.

View file

@ -540,10 +540,18 @@ class JsonSplitMergeTester12:
self._log(" ✅ completePart is valid JSON")
self._log(f" Parsed type: {type(parsedCompletePart).__name__}")
# Compare with original if possible
if isinstance(parsedCompletePart, dict) and isinstance(originalData, dict):
comparison = self.compareJson(originalData, parsedCompletePart)
self._log(f" Comparison with original:")
# Compare with truncated JSON (not original) - parse the truncated part to compare
from modules.shared.jsonUtils import closeJsonStructures, tryParseJson
# Try to parse the truncated JSON part (with structures closed)
truncatedClosed = closeJsonStructures(partContent)
truncatedParsed, truncatedError, _ = tryParseJson(truncatedClosed)
if truncatedParsed is not None:
# Compare completePart with the parsed truncated JSON
if isinstance(parsedCompletePart, dict) and isinstance(truncatedParsed, dict):
comparison = self.compareJson(truncatedParsed, parsedCompletePart)
self._log(f" Comparison with truncated JSON (at cut position {cutPosition}):")
self._log(f" Exact match: {comparison['exactMatch']}")
self._log(f" Size match: {comparison['sizeMatch']}")
if comparison['differences']:
@ -553,11 +561,13 @@ class JsonSplitMergeTester12:
if len(comparison['differences']) > 10:
self._log(f" ... ({len(comparison['differences']) - 10} more differences)")
else:
self._log(" No differences found")
elif isinstance(parsedCompletePart, list) and isinstance(originalData, list):
self._log(f" Both are lists: original={len(originalData)} items, completePart={len(parsedCompletePart)} items")
self._log(" No differences found - completePart matches truncated JSON structure")
elif isinstance(parsedCompletePart, list) and isinstance(truncatedParsed, list):
self._log(f" Both are lists: truncated={len(truncatedParsed)} items, completePart={len(parsedCompletePart)} items")
else:
self._log(f" Different types: original={type(originalData).__name__}, completePart={type(parsedCompletePart).__name__}")
self._log(f" Different types: truncated={type(truncatedParsed).__name__}, completePart={type(parsedCompletePart).__name__}")
else:
self._log(f" Could not parse truncated JSON for comparison (error: {truncatedError})")
except json.JSONDecodeError as e:
isValidJson = False

View file

@ -0,0 +1,307 @@
#!/usr/bin/env python3
# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
JSON Completion Test 13 - Tests JSON completion at various cut positions
Tests a single JSON object (~300 chars) with all JSON structure types.
Cuts the JSON at every position from character 50 to the end, completes it, and validates.
"""
import asyncio
import json
import sys
import os
from typing import Dict, Any, List
# Add the gateway to path (go up 2 levels from tests/functional/)
_gateway_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))
if _gateway_path not in sys.path:
sys.path.insert(0, _gateway_path)
# Import JSON continuation module
from modules.shared.jsonContinuation import getContexts
class JsonCompletionTester13:
def __init__(self):
self.testResults = {}
self.logBuffer = []
self.logFile = None
def createTestJson(self) -> str:
"""
Create a single JSON object (~300 chars) containing all JSON structure types:
- Objects (nested)
- Arrays (nested)
- Strings
- Numbers (integers and floats)
- Booleans (true, false)
- null
"""
testData = {
"id": 12345,
"name": "Test Object",
"active": True,
"inactive": False,
"value": None,
"price": 99.99,
"tags": ["tag1", "tag2", "tag3"],
"metadata": {
"created": "2025-01-01",
"updated": "2025-01-02",
"version": 1
},
"items": [
{"id": 1, "name": "Item A", "count": 10},
{"id": 2, "name": "Item B", "count": 20}
],
"settings": {
"theme": "dark",
"notifications": True,
"features": ["feature1", "feature2"]
}
}
jsonString = json.dumps(testData, indent=2, ensure_ascii=False)
# Ensure it's approximately 300 characters (adjust if needed)
targetLength = 300
if len(jsonString) < targetLength:
# Add padding to metadata
testData["metadata"]["description"] = "A" * (targetLength - len(jsonString) + 20)
jsonString = json.dumps(testData, indent=2, ensure_ascii=False)
# Trim to approximately 300 chars if too long
if len(jsonString) > targetLength + 50:
# Remove some content to get closer to target
testData["metadata"].pop("description", None)
jsonString = json.dumps(testData, indent=2, ensure_ascii=False)
return jsonString
def _log(self, message: str):
"""Add message to log buffer."""
self.logBuffer.append(message)
print(message)
async def testJsonCompletionAtCuts(self, jsonString: str, startPos: int = 50, step: int = 5) -> Dict[str, Any]:
"""
Test JSON completion at various cut positions.
Args:
jsonString: The full JSON string to test
startPos: Starting position for cuts (default 50)
step: Step size between cuts (default 5)
Returns:
Dictionary with test results for each cut position
"""
jsonLength = len(jsonString)
results = {}
self._log("")
self._log("="*80)
self._log("TESTING JSON COMPLETION AT VARIOUS CUT POSITIONS")
self._log("="*80)
self._log(f"JSON length: {jsonLength} characters")
self._log(f"Testing cuts from position {startPos} to {jsonLength} (step: {step})")
self._log("")
# Test at each cut position
cutPositions = list(range(startPos, jsonLength, step))
# Always include the last position
if cutPositions[-1] != jsonLength - 1:
cutPositions.append(jsonLength - 1)
successCount = 0
totalCuts = len(cutPositions)
for cutPos in cutPositions:
# Get truncated JSON
truncatedJson = jsonString[:cutPos]
# Generate contexts
try:
contexts = getContexts(truncatedJson)
completePart = contexts.completePart
overlapContext = contexts.overlapContext
# Test if completePart is valid JSON
isValidJson = False
jsonError = None
parsedData = None
try:
parsedData = json.loads(completePart)
isValidJson = True
except json.JSONDecodeError as e:
jsonError = str(e)
isValidJson = False
# Store result
result = {
"cutPosition": cutPos,
"truncatedLength": len(truncatedJson),
"completePartLength": len(completePart),
"overlapContextLength": len(overlapContext),
"isValidJson": isValidJson,
"jsonError": jsonError,
"truncatedJson": truncatedJson[-50:] if len(truncatedJson) > 50 else truncatedJson, # Last 50 chars
"completePart": completePart[-100:] if len(completePart) > 100 else completePart, # Last 100 chars
"overlapContext": overlapContext[-100:] if len(overlapContext) > 100 else overlapContext # Last 100 chars
}
results[cutPos] = result
if isValidJson:
successCount += 1
self._log(f"✅ Cut at position {cutPos:4d}: Valid JSON (completePart length: {len(completePart)}, overlap length: {len(overlapContext)})")
self._log(f" Overlap: {overlapContext[-80:] if len(overlapContext) > 80 else overlapContext}")
else:
self._log(f"❌ Cut at position {cutPos:4d}: Invalid JSON - {jsonError}")
self._log(f" Truncated (last 50): {truncatedJson[-50:]}")
self._log(f" CompletePart (last 100): {completePart[-100:]}")
self._log(f" Overlap: {overlapContext[-80:] if len(overlapContext) > 80 else overlapContext}")
except Exception as e:
result = {
"cutPosition": cutPos,
"truncatedLength": len(truncatedJson),
"isValidJson": False,
"jsonError": f"Exception: {str(e)}",
"truncatedJson": truncatedJson[-50:] if len(truncatedJson) > 50 else truncatedJson
}
results[cutPos] = result
self._log(f"❌ Cut at position {cutPos:4d}: Exception - {str(e)}")
# Summary
self._log("")
self._log("="*80)
self._log("CUT TEST SUMMARY")
self._log("="*80)
self._log(f"Total cuts tested: {totalCuts}")
self._log(f"Successful completions: {successCount}")
self._log(f"Failed completions: {totalCuts - successCount}")
self._log(f"Success rate: {successCount/totalCuts*100:.1f}%")
self._log("")
# Detailed results for failed cuts
failedCuts = [pos for pos, res in results.items() if not res.get("isValidJson", False)]
if failedCuts:
self._log("Failed cuts:")
for pos in failedCuts[:10]: # Show first 10 failures
res = results[pos]
self._log(f" Position {pos}: {res.get('jsonError', 'Unknown error')}")
overlap = res.get('overlapContext', 'N/A')
if overlap != 'N/A':
self._log(f" Overlap: {overlap[-80:] if len(overlap) > 80 else overlap}")
if len(failedCuts) > 10:
self._log(f" ... ({len(failedCuts) - 10} more failures)")
return {
"totalCuts": totalCuts,
"successCount": successCount,
"failedCount": totalCuts - successCount,
"successRate": successCount / totalCuts * 100 if totalCuts > 0 else 0,
"results": results,
"failedCuts": failedCuts
}
def _writeLogFile(self):
"""Write log buffer to file."""
logDir = os.path.join(os.path.dirname(__file__), "..", "..", "..", "local", "debug")
os.makedirs(logDir, exist_ok=True)
logFilePath = os.path.join(logDir, "test13_json_completion_cuts_results.txt")
with open(logFilePath, 'w', encoding='utf-8') as f:
f.write('\n'.join(self.logBuffer))
self.logFile = logFilePath
print(f"\n📝 Detailed log written to: {logFilePath}")
async def runTest(self):
"""Run the complete test."""
self._log("="*80)
self._log("JSON COMPLETION TEST 13")
self._log("="*80)
try:
# Create test JSON
jsonString = self.createTestJson()
self._log("")
self._log("="*80)
self._log("TEST JSON OBJECT")
self._log("="*80)
self._log(f"Length: {len(jsonString)} characters")
self._log("")
self._log("Full JSON content:")
self._log("-"*80)
jsonLines = jsonString.split('\n')
for line in jsonLines:
self._log(line)
# Test completion at various cuts
results = await self.testJsonCompletionAtCuts(jsonString, startPos=50, step=5)
# Write log file
self._writeLogFile()
# Final summary
self._log("")
self._log("="*80)
self._log("FINAL TEST SUMMARY")
self._log("="*80)
self._log(f"Total cuts tested: {results['totalCuts']}")
self._log(f"✅ Successful: {results['successCount']}")
self._log(f"❌ Failed: {results['failedCount']}")
self._log(f"Success rate: {results['successRate']:.1f}%")
if results['failedCuts']:
self._log("")
self._log("Failed cut positions:")
for pos in results['failedCuts']:
res = results['results'][pos]
self._log(f" Position {pos}: {res.get('jsonError', 'Unknown error')}")
overlap = res.get('overlapContext', 'N/A')
if overlap != 'N/A':
self._log(f" Overlap: {overlap[-80:] if len(overlap) > 80 else overlap}")
self.testResults = {
"success": results['successCount'] == results['totalCuts'],
"totalCuts": results['totalCuts'],
"successCount": results['successCount'],
"failedCount": results['failedCount'],
"successRate": results['successRate'],
"failedCuts": results['failedCuts'],
"results": results['results']
}
return self.testResults
except Exception as e:
import traceback
print(f"\n❌ Test failed with error: {type(e).__name__}: {str(e)}")
print(f"Traceback:\n{traceback.format_exc()}")
self.testResults = {
"success": False,
"error": str(e),
"traceback": traceback.format_exc()
}
return self.testResults
async def main():
"""Run JSON completion test 13."""
tester = JsonCompletionTester13()
results = await tester.runTest()
# Print final results as JSON for easy parsing
print("\n" + "="*80)
print("FINAL RESULTS (JSON)")
print("="*80)
print(json.dumps(results, indent=2, default=str))
if __name__ == "__main__":
asyncio.run(main())