diff --git a/modules/shared/jsonContinuation.py b/modules/shared/jsonContinuation.py index 2fabd103..da35ceab 100644 --- a/modules/shared/jsonContinuation.py +++ b/modules/shared/jsonContinuation.py @@ -327,12 +327,14 @@ class JsonAnalyzer: This creates valid JSON by closing all open strings, brackets/braces. Unvollständige Keys werden entfernt, damit das Ergebnis valides JSON ist. + Unvollständige Keywords (true, false, null) werden vervollständigt. Strategy: 1. Take the full truncated JSON 2. If we're in the middle of a string, close it - 3. Remove incomplete key-value pairs (keys without values) - 4. Close all open brackets/braces + 3. Complete incomplete keywords (tr → true, f → false, n → null) + 4. Remove incomplete key-value pairs (keys without values) + 5. Close all open brackets/braces """ result = self.jsonStr.rstrip() @@ -344,6 +346,9 @@ class JsonAnalyzer: stringClosing = self._getStringClosing(result) result += stringClosing + # Complete incomplete keywords (true, false, null) + result = self._completeIncompleteKeywords(result) + # Check if we're in the middle of a key (after colon) # If string was just closed and we're after a colon with no value, remove the key result = self._cleanIncompleteKeyValue(result) @@ -398,6 +403,85 @@ class JsonAnalyzer: return jsonStr + def _completeIncompleteKeywords(self, jsonStr: str) -> str: + """ + Complete incomplete JSON keywords at the end of the string. + + Checks the last element for incomplete keywords after colon: + - ": t*" or ": f*" or ": n*" -> complete to true/false/null + - ": " or ":" (without keyword) -> set to null + """ + result = jsonStr.rstrip() + + # Find the last colon (not in string) + in_string = False + escaped = False + last_colon_pos = -1 + + for i in range(len(result) - 1, -1, -1): + char = result[i] + + if escaped: + escaped = False + continue + + if char == '\\' and in_string: + escaped = True + continue + + if char == '"': + in_string = not in_string + continue + + if not in_string and char == ':': + last_colon_pos = i + break + + if last_colon_pos < 0: + return result + + # Get text after the last colon + after_colon = result[last_colon_pos + 1:].strip() + + # Check for incomplete keyword patterns + if after_colon.startswith('t') or after_colon.startswith('T'): + # Incomplete true + keyword_start = last_colon_pos + 1 + # Skip whitespace + while keyword_start < len(result) and result[keyword_start] in ' \t\n\r': + keyword_start += 1 + # Remove partial keyword + keyword_end = keyword_start + 1 + while keyword_end < len(result) and result[keyword_end].isalpha(): + keyword_end += 1 + return result[:keyword_start] + 'true' + result[keyword_end:] + + elif after_colon.startswith('f') or after_colon.startswith('F'): + # Incomplete false + keyword_start = last_colon_pos + 1 + while keyword_start < len(result) and result[keyword_start] in ' \t\n\r': + keyword_start += 1 + keyword_end = keyword_start + 1 + while keyword_end < len(result) and result[keyword_end].isalpha(): + keyword_end += 1 + return result[:keyword_start] + 'false' + result[keyword_end:] + + elif after_colon.startswith('n') or after_colon.startswith('N'): + # Incomplete null + keyword_start = last_colon_pos + 1 + while keyword_start < len(result) and result[keyword_start] in ' \t\n\r': + keyword_start += 1 + keyword_end = keyword_start + 1 + while keyword_end < len(result) and result[keyword_end].isalpha(): + keyword_end += 1 + return result[:keyword_start] + 'null' + result[keyword_end:] + + elif not after_colon or after_colon == '': + # No keyword after colon -> set to null + return result + 'null' + + return result + def _isIncompleteKey(self, jsonStr: str) -> bool: """ Check if the last string in the JSON is an incomplete key in an object. diff --git a/tests/functional/test12_json_split_merge.py b/tests/functional/test12_json_split_merge.py index b36b93f2..d259b791 100644 --- a/tests/functional/test12_json_split_merge.py +++ b/tests/functional/test12_json_split_merge.py @@ -540,24 +540,34 @@ class JsonSplitMergeTester12: self._log(" ✅ completePart is valid JSON") self._log(f" Parsed type: {type(parsedCompletePart).__name__}") - # Compare with original if possible - if isinstance(parsedCompletePart, dict) and isinstance(originalData, dict): - comparison = self.compareJson(originalData, parsedCompletePart) - self._log(f" Comparison with original:") - self._log(f" Exact match: {comparison['exactMatch']}") - self._log(f" Size match: {comparison['sizeMatch']}") - if comparison['differences']: - self._log(f" Differences found: {len(comparison['differences'])}") - for diff in comparison['differences'][:10]: # Show first 10 differences - self._log(f" - {diff}") - if len(comparison['differences']) > 10: - self._log(f" ... ({len(comparison['differences']) - 10} more differences)") + # Compare with truncated JSON (not original) - parse the truncated part to compare + from modules.shared.jsonUtils import closeJsonStructures, tryParseJson + + # Try to parse the truncated JSON part (with structures closed) + truncatedClosed = closeJsonStructures(partContent) + truncatedParsed, truncatedError, _ = tryParseJson(truncatedClosed) + + if truncatedParsed is not None: + # Compare completePart with the parsed truncated JSON + if isinstance(parsedCompletePart, dict) and isinstance(truncatedParsed, dict): + comparison = self.compareJson(truncatedParsed, parsedCompletePart) + self._log(f" Comparison with truncated JSON (at cut position {cutPosition}):") + self._log(f" Exact match: {comparison['exactMatch']}") + self._log(f" Size match: {comparison['sizeMatch']}") + if comparison['differences']: + self._log(f" Differences found: {len(comparison['differences'])}") + for diff in comparison['differences'][:10]: # Show first 10 differences + self._log(f" - {diff}") + if len(comparison['differences']) > 10: + self._log(f" ... ({len(comparison['differences']) - 10} more differences)") + else: + self._log(" No differences found - completePart matches truncated JSON structure") + elif isinstance(parsedCompletePart, list) and isinstance(truncatedParsed, list): + self._log(f" Both are lists: truncated={len(truncatedParsed)} items, completePart={len(parsedCompletePart)} items") else: - self._log(" No differences found") - elif isinstance(parsedCompletePart, list) and isinstance(originalData, list): - self._log(f" Both are lists: original={len(originalData)} items, completePart={len(parsedCompletePart)} items") + self._log(f" Different types: truncated={type(truncatedParsed).__name__}, completePart={type(parsedCompletePart).__name__}") else: - self._log(f" Different types: original={type(originalData).__name__}, completePart={type(parsedCompletePart).__name__}") + self._log(f" Could not parse truncated JSON for comparison (error: {truncatedError})") except json.JSONDecodeError as e: isValidJson = False diff --git a/tests/functional/test13_json_completion_cuts.py b/tests/functional/test13_json_completion_cuts.py new file mode 100644 index 00000000..4ff05014 --- /dev/null +++ b/tests/functional/test13_json_completion_cuts.py @@ -0,0 +1,307 @@ +#!/usr/bin/env python3 +# Copyright (c) 2025 Patrick Motsch +# All rights reserved. +""" +JSON Completion Test 13 - Tests JSON completion at various cut positions +Tests a single JSON object (~300 chars) with all JSON structure types. +Cuts the JSON at every position from character 50 to the end, completes it, and validates. +""" + +import asyncio +import json +import sys +import os +from typing import Dict, Any, List + +# Add the gateway to path (go up 2 levels from tests/functional/) +_gateway_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")) +if _gateway_path not in sys.path: + sys.path.insert(0, _gateway_path) + +# Import JSON continuation module +from modules.shared.jsonContinuation import getContexts + + +class JsonCompletionTester13: + def __init__(self): + self.testResults = {} + self.logBuffer = [] + self.logFile = None + + def createTestJson(self) -> str: + """ + Create a single JSON object (~300 chars) containing all JSON structure types: + - Objects (nested) + - Arrays (nested) + - Strings + - Numbers (integers and floats) + - Booleans (true, false) + - null + """ + testData = { + "id": 12345, + "name": "Test Object", + "active": True, + "inactive": False, + "value": None, + "price": 99.99, + "tags": ["tag1", "tag2", "tag3"], + "metadata": { + "created": "2025-01-01", + "updated": "2025-01-02", + "version": 1 + }, + "items": [ + {"id": 1, "name": "Item A", "count": 10}, + {"id": 2, "name": "Item B", "count": 20} + ], + "settings": { + "theme": "dark", + "notifications": True, + "features": ["feature1", "feature2"] + } + } + + jsonString = json.dumps(testData, indent=2, ensure_ascii=False) + + # Ensure it's approximately 300 characters (adjust if needed) + targetLength = 300 + if len(jsonString) < targetLength: + # Add padding to metadata + testData["metadata"]["description"] = "A" * (targetLength - len(jsonString) + 20) + jsonString = json.dumps(testData, indent=2, ensure_ascii=False) + + # Trim to approximately 300 chars if too long + if len(jsonString) > targetLength + 50: + # Remove some content to get closer to target + testData["metadata"].pop("description", None) + jsonString = json.dumps(testData, indent=2, ensure_ascii=False) + + return jsonString + + def _log(self, message: str): + """Add message to log buffer.""" + self.logBuffer.append(message) + print(message) + + async def testJsonCompletionAtCuts(self, jsonString: str, startPos: int = 50, step: int = 5) -> Dict[str, Any]: + """ + Test JSON completion at various cut positions. + + Args: + jsonString: The full JSON string to test + startPos: Starting position for cuts (default 50) + step: Step size between cuts (default 5) + + Returns: + Dictionary with test results for each cut position + """ + jsonLength = len(jsonString) + results = {} + + self._log("") + self._log("="*80) + self._log("TESTING JSON COMPLETION AT VARIOUS CUT POSITIONS") + self._log("="*80) + self._log(f"JSON length: {jsonLength} characters") + self._log(f"Testing cuts from position {startPos} to {jsonLength} (step: {step})") + self._log("") + + # Test at each cut position + cutPositions = list(range(startPos, jsonLength, step)) + # Always include the last position + if cutPositions[-1] != jsonLength - 1: + cutPositions.append(jsonLength - 1) + + successCount = 0 + totalCuts = len(cutPositions) + + for cutPos in cutPositions: + # Get truncated JSON + truncatedJson = jsonString[:cutPos] + + # Generate contexts + try: + contexts = getContexts(truncatedJson) + completePart = contexts.completePart + overlapContext = contexts.overlapContext + + # Test if completePart is valid JSON + isValidJson = False + jsonError = None + parsedData = None + + try: + parsedData = json.loads(completePart) + isValidJson = True + except json.JSONDecodeError as e: + jsonError = str(e) + isValidJson = False + + # Store result + result = { + "cutPosition": cutPos, + "truncatedLength": len(truncatedJson), + "completePartLength": len(completePart), + "overlapContextLength": len(overlapContext), + "isValidJson": isValidJson, + "jsonError": jsonError, + "truncatedJson": truncatedJson[-50:] if len(truncatedJson) > 50 else truncatedJson, # Last 50 chars + "completePart": completePart[-100:] if len(completePart) > 100 else completePart, # Last 100 chars + "overlapContext": overlapContext[-100:] if len(overlapContext) > 100 else overlapContext # Last 100 chars + } + + results[cutPos] = result + + if isValidJson: + successCount += 1 + self._log(f"✅ Cut at position {cutPos:4d}: Valid JSON (completePart length: {len(completePart)}, overlap length: {len(overlapContext)})") + self._log(f" Overlap: {overlapContext[-80:] if len(overlapContext) > 80 else overlapContext}") + else: + self._log(f"❌ Cut at position {cutPos:4d}: Invalid JSON - {jsonError}") + self._log(f" Truncated (last 50): {truncatedJson[-50:]}") + self._log(f" CompletePart (last 100): {completePart[-100:]}") + self._log(f" Overlap: {overlapContext[-80:] if len(overlapContext) > 80 else overlapContext}") + + except Exception as e: + result = { + "cutPosition": cutPos, + "truncatedLength": len(truncatedJson), + "isValidJson": False, + "jsonError": f"Exception: {str(e)}", + "truncatedJson": truncatedJson[-50:] if len(truncatedJson) > 50 else truncatedJson + } + results[cutPos] = result + self._log(f"❌ Cut at position {cutPos:4d}: Exception - {str(e)}") + + # Summary + self._log("") + self._log("="*80) + self._log("CUT TEST SUMMARY") + self._log("="*80) + self._log(f"Total cuts tested: {totalCuts}") + self._log(f"Successful completions: {successCount}") + self._log(f"Failed completions: {totalCuts - successCount}") + self._log(f"Success rate: {successCount/totalCuts*100:.1f}%") + self._log("") + + # Detailed results for failed cuts + failedCuts = [pos for pos, res in results.items() if not res.get("isValidJson", False)] + if failedCuts: + self._log("Failed cuts:") + for pos in failedCuts[:10]: # Show first 10 failures + res = results[pos] + self._log(f" Position {pos}: {res.get('jsonError', 'Unknown error')}") + overlap = res.get('overlapContext', 'N/A') + if overlap != 'N/A': + self._log(f" Overlap: {overlap[-80:] if len(overlap) > 80 else overlap}") + if len(failedCuts) > 10: + self._log(f" ... ({len(failedCuts) - 10} more failures)") + + return { + "totalCuts": totalCuts, + "successCount": successCount, + "failedCount": totalCuts - successCount, + "successRate": successCount / totalCuts * 100 if totalCuts > 0 else 0, + "results": results, + "failedCuts": failedCuts + } + + def _writeLogFile(self): + """Write log buffer to file.""" + logDir = os.path.join(os.path.dirname(__file__), "..", "..", "..", "local", "debug") + os.makedirs(logDir, exist_ok=True) + logFilePath = os.path.join(logDir, "test13_json_completion_cuts_results.txt") + + with open(logFilePath, 'w', encoding='utf-8') as f: + f.write('\n'.join(self.logBuffer)) + + self.logFile = logFilePath + print(f"\n📝 Detailed log written to: {logFilePath}") + + async def runTest(self): + """Run the complete test.""" + self._log("="*80) + self._log("JSON COMPLETION TEST 13") + self._log("="*80) + + try: + # Create test JSON + jsonString = self.createTestJson() + + self._log("") + self._log("="*80) + self._log("TEST JSON OBJECT") + self._log("="*80) + self._log(f"Length: {len(jsonString)} characters") + self._log("") + self._log("Full JSON content:") + self._log("-"*80) + jsonLines = jsonString.split('\n') + for line in jsonLines: + self._log(line) + + # Test completion at various cuts + results = await self.testJsonCompletionAtCuts(jsonString, startPos=50, step=5) + + # Write log file + self._writeLogFile() + + # Final summary + self._log("") + self._log("="*80) + self._log("FINAL TEST SUMMARY") + self._log("="*80) + self._log(f"Total cuts tested: {results['totalCuts']}") + self._log(f"✅ Successful: {results['successCount']}") + self._log(f"❌ Failed: {results['failedCount']}") + self._log(f"Success rate: {results['successRate']:.1f}%") + + if results['failedCuts']: + self._log("") + self._log("Failed cut positions:") + for pos in results['failedCuts']: + res = results['results'][pos] + self._log(f" Position {pos}: {res.get('jsonError', 'Unknown error')}") + overlap = res.get('overlapContext', 'N/A') + if overlap != 'N/A': + self._log(f" Overlap: {overlap[-80:] if len(overlap) > 80 else overlap}") + + self.testResults = { + "success": results['successCount'] == results['totalCuts'], + "totalCuts": results['totalCuts'], + "successCount": results['successCount'], + "failedCount": results['failedCount'], + "successRate": results['successRate'], + "failedCuts": results['failedCuts'], + "results": results['results'] + } + + return self.testResults + + except Exception as e: + import traceback + print(f"\n❌ Test failed with error: {type(e).__name__}: {str(e)}") + print(f"Traceback:\n{traceback.format_exc()}") + self.testResults = { + "success": False, + "error": str(e), + "traceback": traceback.format_exc() + } + return self.testResults + + +async def main(): + """Run JSON completion test 13.""" + tester = JsonCompletionTester13() + results = await tester.runTest() + + # Print final results as JSON for easy parsing + print("\n" + "="*80) + print("FINAL RESULTS (JSON)") + print("="*80) + print(json.dumps(results, indent=2, default=str)) + + +if __name__ == "__main__": + asyncio.run(main())