From 16ebf1b1520d623fcad2c419c113a0743d061d7f Mon Sep 17 00:00:00 2001
From: ValueOn AG
Date: Mon, 5 Jan 2026 21:49:40 +0100
Subject: [PATCH] tested json cut and merge handler for all scenarioes
---
modules/shared/jsonContinuation.py | 88 ++++-
tests/functional/test12_json_split_merge.py | 42 ++-
.../functional/test13_json_completion_cuts.py | 307 ++++++++++++++++++
3 files changed, 419 insertions(+), 18 deletions(-)
create mode 100644 tests/functional/test13_json_completion_cuts.py
diff --git a/modules/shared/jsonContinuation.py b/modules/shared/jsonContinuation.py
index 2fabd103..da35ceab 100644
--- a/modules/shared/jsonContinuation.py
+++ b/modules/shared/jsonContinuation.py
@@ -327,12 +327,14 @@ class JsonAnalyzer:
This creates valid JSON by closing all open strings, brackets/braces.
Unvollständige Keys werden entfernt, damit das Ergebnis valides JSON ist.
+ Unvollständige Keywords (true, false, null) werden vervollständigt.
Strategy:
1. Take the full truncated JSON
2. If we're in the middle of a string, close it
- 3. Remove incomplete key-value pairs (keys without values)
- 4. Close all open brackets/braces
+ 3. Complete incomplete keywords (tr → true, f → false, n → null)
+ 4. Remove incomplete key-value pairs (keys without values)
+ 5. Close all open brackets/braces
"""
result = self.jsonStr.rstrip()
@@ -344,6 +346,9 @@ class JsonAnalyzer:
stringClosing = self._getStringClosing(result)
result += stringClosing
+ # Complete incomplete keywords (true, false, null)
+ result = self._completeIncompleteKeywords(result)
+
# Check if we're in the middle of a key (after colon)
# If string was just closed and we're after a colon with no value, remove the key
result = self._cleanIncompleteKeyValue(result)
@@ -398,6 +403,85 @@ class JsonAnalyzer:
return jsonStr
+ def _completeIncompleteKeywords(self, jsonStr: str) -> str:
+ """
+ Complete incomplete JSON keywords at the end of the string.
+
+ Checks the last element for incomplete keywords after colon:
+ - ": t*" or ": f*" or ": n*" -> complete to true/false/null
+ - ": " or ":" (without keyword) -> set to null
+ """
+ result = jsonStr.rstrip()
+
+ # Find the last colon (not in string)
+ in_string = False
+ escaped = False
+ last_colon_pos = -1
+
+ for i in range(len(result) - 1, -1, -1):
+ char = result[i]
+
+ if escaped:
+ escaped = False
+ continue
+
+ if char == '\\' and in_string:
+ escaped = True
+ continue
+
+ if char == '"':
+ in_string = not in_string
+ continue
+
+ if not in_string and char == ':':
+ last_colon_pos = i
+ break
+
+ if last_colon_pos < 0:
+ return result
+
+ # Get text after the last colon
+ after_colon = result[last_colon_pos + 1:].strip()
+
+ # Check for incomplete keyword patterns
+ if after_colon.startswith('t') or after_colon.startswith('T'):
+ # Incomplete true
+ keyword_start = last_colon_pos + 1
+ # Skip whitespace
+ while keyword_start < len(result) and result[keyword_start] in ' \t\n\r':
+ keyword_start += 1
+ # Remove partial keyword
+ keyword_end = keyword_start + 1
+ while keyword_end < len(result) and result[keyword_end].isalpha():
+ keyword_end += 1
+ return result[:keyword_start] + 'true' + result[keyword_end:]
+
+ elif after_colon.startswith('f') or after_colon.startswith('F'):
+ # Incomplete false
+ keyword_start = last_colon_pos + 1
+ while keyword_start < len(result) and result[keyword_start] in ' \t\n\r':
+ keyword_start += 1
+ keyword_end = keyword_start + 1
+ while keyword_end < len(result) and result[keyword_end].isalpha():
+ keyword_end += 1
+ return result[:keyword_start] + 'false' + result[keyword_end:]
+
+ elif after_colon.startswith('n') or after_colon.startswith('N'):
+ # Incomplete null
+ keyword_start = last_colon_pos + 1
+ while keyword_start < len(result) and result[keyword_start] in ' \t\n\r':
+ keyword_start += 1
+ keyword_end = keyword_start + 1
+ while keyword_end < len(result) and result[keyword_end].isalpha():
+ keyword_end += 1
+ return result[:keyword_start] + 'null' + result[keyword_end:]
+
+ elif not after_colon or after_colon == '':
+ # No keyword after colon -> set to null
+ return result + 'null'
+
+ return result
+
def _isIncompleteKey(self, jsonStr: str) -> bool:
"""
Check if the last string in the JSON is an incomplete key in an object.
diff --git a/tests/functional/test12_json_split_merge.py b/tests/functional/test12_json_split_merge.py
index b36b93f2..d259b791 100644
--- a/tests/functional/test12_json_split_merge.py
+++ b/tests/functional/test12_json_split_merge.py
@@ -540,24 +540,34 @@ class JsonSplitMergeTester12:
self._log(" ✅ completePart is valid JSON")
self._log(f" Parsed type: {type(parsedCompletePart).__name__}")
- # Compare with original if possible
- if isinstance(parsedCompletePart, dict) and isinstance(originalData, dict):
- comparison = self.compareJson(originalData, parsedCompletePart)
- self._log(f" Comparison with original:")
- self._log(f" Exact match: {comparison['exactMatch']}")
- self._log(f" Size match: {comparison['sizeMatch']}")
- if comparison['differences']:
- self._log(f" Differences found: {len(comparison['differences'])}")
- for diff in comparison['differences'][:10]: # Show first 10 differences
- self._log(f" - {diff}")
- if len(comparison['differences']) > 10:
- self._log(f" ... ({len(comparison['differences']) - 10} more differences)")
+ # Compare with truncated JSON (not original) - parse the truncated part to compare
+ from modules.shared.jsonUtils import closeJsonStructures, tryParseJson
+
+ # Try to parse the truncated JSON part (with structures closed)
+ truncatedClosed = closeJsonStructures(partContent)
+ truncatedParsed, truncatedError, _ = tryParseJson(truncatedClosed)
+
+ if truncatedParsed is not None:
+ # Compare completePart with the parsed truncated JSON
+ if isinstance(parsedCompletePart, dict) and isinstance(truncatedParsed, dict):
+ comparison = self.compareJson(truncatedParsed, parsedCompletePart)
+ self._log(f" Comparison with truncated JSON (at cut position {cutPosition}):")
+ self._log(f" Exact match: {comparison['exactMatch']}")
+ self._log(f" Size match: {comparison['sizeMatch']}")
+ if comparison['differences']:
+ self._log(f" Differences found: {len(comparison['differences'])}")
+ for diff in comparison['differences'][:10]: # Show first 10 differences
+ self._log(f" - {diff}")
+ if len(comparison['differences']) > 10:
+ self._log(f" ... ({len(comparison['differences']) - 10} more differences)")
+ else:
+ self._log(" No differences found - completePart matches truncated JSON structure")
+ elif isinstance(parsedCompletePart, list) and isinstance(truncatedParsed, list):
+ self._log(f" Both are lists: truncated={len(truncatedParsed)} items, completePart={len(parsedCompletePart)} items")
else:
- self._log(" No differences found")
- elif isinstance(parsedCompletePart, list) and isinstance(originalData, list):
- self._log(f" Both are lists: original={len(originalData)} items, completePart={len(parsedCompletePart)} items")
+ self._log(f" Different types: truncated={type(truncatedParsed).__name__}, completePart={type(parsedCompletePart).__name__}")
else:
- self._log(f" Different types: original={type(originalData).__name__}, completePart={type(parsedCompletePart).__name__}")
+ self._log(f" Could not parse truncated JSON for comparison (error: {truncatedError})")
except json.JSONDecodeError as e:
isValidJson = False
diff --git a/tests/functional/test13_json_completion_cuts.py b/tests/functional/test13_json_completion_cuts.py
new file mode 100644
index 00000000..4ff05014
--- /dev/null
+++ b/tests/functional/test13_json_completion_cuts.py
@@ -0,0 +1,307 @@
+#!/usr/bin/env python3
+# Copyright (c) 2025 Patrick Motsch
+# All rights reserved.
+"""
+JSON Completion Test 13 - Tests JSON completion at various cut positions
+Tests a single JSON object (~300 chars) with all JSON structure types.
+Cuts the JSON at every position from character 50 to the end, completes it, and validates.
+"""
+
+import asyncio
+import json
+import sys
+import os
+from typing import Dict, Any, List
+
+# Add the gateway to path (go up 2 levels from tests/functional/)
+_gateway_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))
+if _gateway_path not in sys.path:
+ sys.path.insert(0, _gateway_path)
+
+# Import JSON continuation module
+from modules.shared.jsonContinuation import getContexts
+
+
+class JsonCompletionTester13:
+ def __init__(self):
+ self.testResults = {}
+ self.logBuffer = []
+ self.logFile = None
+
+ def createTestJson(self) -> str:
+ """
+ Create a single JSON object (~300 chars) containing all JSON structure types:
+ - Objects (nested)
+ - Arrays (nested)
+ - Strings
+ - Numbers (integers and floats)
+ - Booleans (true, false)
+ - null
+ """
+ testData = {
+ "id": 12345,
+ "name": "Test Object",
+ "active": True,
+ "inactive": False,
+ "value": None,
+ "price": 99.99,
+ "tags": ["tag1", "tag2", "tag3"],
+ "metadata": {
+ "created": "2025-01-01",
+ "updated": "2025-01-02",
+ "version": 1
+ },
+ "items": [
+ {"id": 1, "name": "Item A", "count": 10},
+ {"id": 2, "name": "Item B", "count": 20}
+ ],
+ "settings": {
+ "theme": "dark",
+ "notifications": True,
+ "features": ["feature1", "feature2"]
+ }
+ }
+
+ jsonString = json.dumps(testData, indent=2, ensure_ascii=False)
+
+ # Ensure it's approximately 300 characters (adjust if needed)
+ targetLength = 300
+ if len(jsonString) < targetLength:
+ # Add padding to metadata
+ testData["metadata"]["description"] = "A" * (targetLength - len(jsonString) + 20)
+ jsonString = json.dumps(testData, indent=2, ensure_ascii=False)
+
+ # Trim to approximately 300 chars if too long
+ if len(jsonString) > targetLength + 50:
+ # Remove some content to get closer to target
+ testData["metadata"].pop("description", None)
+ jsonString = json.dumps(testData, indent=2, ensure_ascii=False)
+
+ return jsonString
+
+ def _log(self, message: str):
+ """Add message to log buffer."""
+ self.logBuffer.append(message)
+ print(message)
+
+ async def testJsonCompletionAtCuts(self, jsonString: str, startPos: int = 50, step: int = 5) -> Dict[str, Any]:
+ """
+ Test JSON completion at various cut positions.
+
+ Args:
+ jsonString: The full JSON string to test
+ startPos: Starting position for cuts (default 50)
+ step: Step size between cuts (default 5)
+
+ Returns:
+ Dictionary with test results for each cut position
+ """
+ jsonLength = len(jsonString)
+ results = {}
+
+ self._log("")
+ self._log("="*80)
+ self._log("TESTING JSON COMPLETION AT VARIOUS CUT POSITIONS")
+ self._log("="*80)
+ self._log(f"JSON length: {jsonLength} characters")
+ self._log(f"Testing cuts from position {startPos} to {jsonLength} (step: {step})")
+ self._log("")
+
+ # Test at each cut position
+ cutPositions = list(range(startPos, jsonLength, step))
+ # Always include the last position
+ if cutPositions[-1] != jsonLength - 1:
+ cutPositions.append(jsonLength - 1)
+
+ successCount = 0
+ totalCuts = len(cutPositions)
+
+ for cutPos in cutPositions:
+ # Get truncated JSON
+ truncatedJson = jsonString[:cutPos]
+
+ # Generate contexts
+ try:
+ contexts = getContexts(truncatedJson)
+ completePart = contexts.completePart
+ overlapContext = contexts.overlapContext
+
+ # Test if completePart is valid JSON
+ isValidJson = False
+ jsonError = None
+ parsedData = None
+
+ try:
+ parsedData = json.loads(completePart)
+ isValidJson = True
+ except json.JSONDecodeError as e:
+ jsonError = str(e)
+ isValidJson = False
+
+ # Store result
+ result = {
+ "cutPosition": cutPos,
+ "truncatedLength": len(truncatedJson),
+ "completePartLength": len(completePart),
+ "overlapContextLength": len(overlapContext),
+ "isValidJson": isValidJson,
+ "jsonError": jsonError,
+ "truncatedJson": truncatedJson[-50:] if len(truncatedJson) > 50 else truncatedJson, # Last 50 chars
+ "completePart": completePart[-100:] if len(completePart) > 100 else completePart, # Last 100 chars
+ "overlapContext": overlapContext[-100:] if len(overlapContext) > 100 else overlapContext # Last 100 chars
+ }
+
+ results[cutPos] = result
+
+ if isValidJson:
+ successCount += 1
+ self._log(f"✅ Cut at position {cutPos:4d}: Valid JSON (completePart length: {len(completePart)}, overlap length: {len(overlapContext)})")
+ self._log(f" Overlap: {overlapContext[-80:] if len(overlapContext) > 80 else overlapContext}")
+ else:
+ self._log(f"❌ Cut at position {cutPos:4d}: Invalid JSON - {jsonError}")
+ self._log(f" Truncated (last 50): {truncatedJson[-50:]}")
+ self._log(f" CompletePart (last 100): {completePart[-100:]}")
+ self._log(f" Overlap: {overlapContext[-80:] if len(overlapContext) > 80 else overlapContext}")
+
+ except Exception as e:
+ result = {
+ "cutPosition": cutPos,
+ "truncatedLength": len(truncatedJson),
+ "isValidJson": False,
+ "jsonError": f"Exception: {str(e)}",
+ "truncatedJson": truncatedJson[-50:] if len(truncatedJson) > 50 else truncatedJson
+ }
+ results[cutPos] = result
+ self._log(f"❌ Cut at position {cutPos:4d}: Exception - {str(e)}")
+
+ # Summary
+ self._log("")
+ self._log("="*80)
+ self._log("CUT TEST SUMMARY")
+ self._log("="*80)
+ self._log(f"Total cuts tested: {totalCuts}")
+ self._log(f"Successful completions: {successCount}")
+ self._log(f"Failed completions: {totalCuts - successCount}")
+ self._log(f"Success rate: {successCount/totalCuts*100:.1f}%")
+ self._log("")
+
+ # Detailed results for failed cuts
+ failedCuts = [pos for pos, res in results.items() if not res.get("isValidJson", False)]
+ if failedCuts:
+ self._log("Failed cuts:")
+ for pos in failedCuts[:10]: # Show first 10 failures
+ res = results[pos]
+ self._log(f" Position {pos}: {res.get('jsonError', 'Unknown error')}")
+ overlap = res.get('overlapContext', 'N/A')
+ if overlap != 'N/A':
+ self._log(f" Overlap: {overlap[-80:] if len(overlap) > 80 else overlap}")
+ if len(failedCuts) > 10:
+ self._log(f" ... ({len(failedCuts) - 10} more failures)")
+
+ return {
+ "totalCuts": totalCuts,
+ "successCount": successCount,
+ "failedCount": totalCuts - successCount,
+ "successRate": successCount / totalCuts * 100 if totalCuts > 0 else 0,
+ "results": results,
+ "failedCuts": failedCuts
+ }
+
+ def _writeLogFile(self):
+ """Write log buffer to file."""
+ logDir = os.path.join(os.path.dirname(__file__), "..", "..", "..", "local", "debug")
+ os.makedirs(logDir, exist_ok=True)
+ logFilePath = os.path.join(logDir, "test13_json_completion_cuts_results.txt")
+
+ with open(logFilePath, 'w', encoding='utf-8') as f:
+ f.write('\n'.join(self.logBuffer))
+
+ self.logFile = logFilePath
+ print(f"\n📝 Detailed log written to: {logFilePath}")
+
+ async def runTest(self):
+ """Run the complete test."""
+ self._log("="*80)
+ self._log("JSON COMPLETION TEST 13")
+ self._log("="*80)
+
+ try:
+ # Create test JSON
+ jsonString = self.createTestJson()
+
+ self._log("")
+ self._log("="*80)
+ self._log("TEST JSON OBJECT")
+ self._log("="*80)
+ self._log(f"Length: {len(jsonString)} characters")
+ self._log("")
+ self._log("Full JSON content:")
+ self._log("-"*80)
+ jsonLines = jsonString.split('\n')
+ for line in jsonLines:
+ self._log(line)
+
+ # Test completion at various cuts
+ results = await self.testJsonCompletionAtCuts(jsonString, startPos=50, step=5)
+
+ # Write log file
+ self._writeLogFile()
+
+ # Final summary
+ self._log("")
+ self._log("="*80)
+ self._log("FINAL TEST SUMMARY")
+ self._log("="*80)
+ self._log(f"Total cuts tested: {results['totalCuts']}")
+ self._log(f"✅ Successful: {results['successCount']}")
+ self._log(f"❌ Failed: {results['failedCount']}")
+ self._log(f"Success rate: {results['successRate']:.1f}%")
+
+ if results['failedCuts']:
+ self._log("")
+ self._log("Failed cut positions:")
+ for pos in results['failedCuts']:
+ res = results['results'][pos]
+ self._log(f" Position {pos}: {res.get('jsonError', 'Unknown error')}")
+ overlap = res.get('overlapContext', 'N/A')
+ if overlap != 'N/A':
+ self._log(f" Overlap: {overlap[-80:] if len(overlap) > 80 else overlap}")
+
+ self.testResults = {
+ "success": results['successCount'] == results['totalCuts'],
+ "totalCuts": results['totalCuts'],
+ "successCount": results['successCount'],
+ "failedCount": results['failedCount'],
+ "successRate": results['successRate'],
+ "failedCuts": results['failedCuts'],
+ "results": results['results']
+ }
+
+ return self.testResults
+
+ except Exception as e:
+ import traceback
+ print(f"\n❌ Test failed with error: {type(e).__name__}: {str(e)}")
+ print(f"Traceback:\n{traceback.format_exc()}")
+ self.testResults = {
+ "success": False,
+ "error": str(e),
+ "traceback": traceback.format_exc()
+ }
+ return self.testResults
+
+
+async def main():
+ """Run JSON completion test 13."""
+ tester = JsonCompletionTester13()
+ results = await tester.runTest()
+
+ # Print final results as JSON for easy parsing
+ print("\n" + "="*80)
+ print("FINAL RESULTS (JSON)")
+ print("="*80)
+ print(json.dumps(results, indent=2, default=str))
+
+
+if __name__ == "__main__":
+ asyncio.run(main())