2224 lines
80 KiB
Python
2224 lines
80 KiB
Python
"""
|
|
JSON Continuation Context Module
|
|
|
|
Generiert drei Kontexte für abgeschnittene JSON-Strings:
|
|
1. Overlap Context: Das innerste Objekt/Array-Element, das den Cut-Punkt enthält
|
|
2. Hierarchy Context: Die hierarchische Struktur vom Root bis zum Cut mit Budget-Logik
|
|
3. Complete Part: Der vollständige Teil des JSONs mit allen Strukturen geschlossen
|
|
|
|
Hauptfunktionen:
|
|
- extractContinuationContexts(truncatedJson: str) -> Tuple[str, str, str]
|
|
Extrahiert alle drei Kontexte aus einem abgeschnittenen JSON-String.
|
|
|
|
- getContexts(truncatedJson: str) -> JsonContinuationContexts
|
|
Gibt alle Kontexte als Pydantic-Modell zurück mit benannten Feldern.
|
|
|
|
Modulkonstanten:
|
|
- BUDGET_LIMIT: int = 500
|
|
Zeichen-Budget für vollständige Datenwerte im Hierarchy Context
|
|
|
|
- OVERLAP_MAX_CHARS: int = 1000
|
|
Maximale Zeichen für den Overlap Context
|
|
|
|
Verwendung:
|
|
>>> from modules.shared.jsonContinuation import getContexts
|
|
>>> jsonStr = '{"users": [{"name": "John", "bio": "Hello Wor'
|
|
>>> contexts = getContexts(jsonStr)
|
|
>>> print(contexts.overlapContext)
|
|
>>> print(contexts.hierarchyContext)
|
|
>>> print(contexts.completePart)
|
|
|
|
Autor: Claude
|
|
Version: 2.0
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import re
|
|
from typing import Tuple, List, Optional, Any, Set
|
|
from dataclasses import dataclass, field
|
|
from enum import Enum
|
|
from modules.datamodels.datamodelAi import JsonContinuationContexts
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# =============================================================================
|
|
# MODULE CONSTANTS
|
|
# =============================================================================
|
|
|
|
BUDGET_LIMIT: int = 2000
|
|
"""Zeichen-Budget für vollständige Datenwerte im Hierarchy Context"""
|
|
|
|
OVERLAP_MAX_CHARS: int = 1000
|
|
"""Maximale Zeichen für den Overlap Context"""
|
|
|
|
|
|
# =============================================================================
|
|
# TOKEN TYPES AND DATA CLASSES
|
|
# =============================================================================
|
|
|
|
|
|
class TokenType(Enum):
|
|
"""JSON Token Types"""
|
|
OBJECT_START = "{"
|
|
OBJECT_END = "}"
|
|
ARRAY_START = "["
|
|
ARRAY_END = "]"
|
|
STRING = "string"
|
|
NUMBER = "number"
|
|
BOOLEAN = "boolean"
|
|
NULL = "null"
|
|
COLON = ":"
|
|
COMMA = ","
|
|
KEY = "key"
|
|
EOF = "eof"
|
|
TRUNCATED = "truncated"
|
|
|
|
|
|
@dataclass
|
|
class Token:
|
|
"""Represents a JSON token with position info"""
|
|
type: TokenType
|
|
value: Any
|
|
start_pos: int
|
|
end_pos: int
|
|
raw: str # Original string representation
|
|
|
|
|
|
@dataclass
|
|
class StackFrame:
|
|
"""Represents a level in the JSON hierarchy"""
|
|
type: str # "object" or "array"
|
|
start_pos: int
|
|
key: Optional[str] = None # Current key for objects
|
|
index: int = 0 # Current index for arrays
|
|
content: str = "" # Accumulated content for this frame
|
|
keys_seen: List[str] = None # Keys seen in this object
|
|
|
|
def __post_init__(self):
|
|
if self.keys_seen is None:
|
|
self.keys_seen = []
|
|
|
|
|
|
class JsonTokenizer:
|
|
"""Tokenizer for potentially truncated JSON strings"""
|
|
|
|
def __init__(self, jsonStr: str):
|
|
self.jsonStr = jsonStr
|
|
self.pos = 0
|
|
self.length = len(jsonStr)
|
|
|
|
def skipWhitespace(self):
|
|
"""Skip whitespace characters"""
|
|
while self.pos < self.length and self.jsonStr[self.pos] in ' \t\n\r':
|
|
self.pos += 1
|
|
|
|
def peek(self) -> Optional[str]:
|
|
"""Peek at current character without consuming"""
|
|
if self.pos < self.length:
|
|
return self.jsonStr[self.pos]
|
|
return None
|
|
|
|
def readString(self) -> Token:
|
|
"""Read a JSON string token"""
|
|
start_pos = self.pos
|
|
self.pos += 1 # Skip opening quote
|
|
|
|
escaped = False
|
|
while self.pos < self.length:
|
|
char = self.jsonStr[self.pos]
|
|
if escaped:
|
|
escaped = False
|
|
self.pos += 1
|
|
elif char == '\\':
|
|
escaped = True
|
|
self.pos += 1
|
|
elif char == '"':
|
|
self.pos += 1
|
|
raw = self.jsonStr[start_pos:self.pos]
|
|
try:
|
|
# Try to decode the string value
|
|
value = raw[1:-1] # Remove quotes for value
|
|
except:
|
|
value = raw
|
|
return Token(TokenType.STRING, value, start_pos, self.pos, raw)
|
|
else:
|
|
self.pos += 1
|
|
|
|
# String was truncated
|
|
raw = self.jsonStr[start_pos:self.pos]
|
|
return Token(TokenType.TRUNCATED, raw[1:] if len(raw) > 1 else "", start_pos, self.pos, raw)
|
|
|
|
def readNumber(self) -> Token:
|
|
"""Read a JSON number token"""
|
|
start_pos = self.pos
|
|
|
|
# Handle negative
|
|
if self.pos < self.length and self.jsonStr[self.pos] == '-':
|
|
self.pos += 1
|
|
|
|
# Read digits
|
|
while self.pos < self.length and self.jsonStr[self.pos].isdigit():
|
|
self.pos += 1
|
|
|
|
# Decimal part
|
|
if self.pos < self.length and self.jsonStr[self.pos] == '.':
|
|
self.pos += 1
|
|
while self.pos < self.length and self.jsonStr[self.pos].isdigit():
|
|
self.pos += 1
|
|
|
|
# Exponent
|
|
if self.pos < self.length and self.jsonStr[self.pos] in 'eE':
|
|
self.pos += 1
|
|
if self.pos < self.length and self.jsonStr[self.pos] in '+-':
|
|
self.pos += 1
|
|
while self.pos < self.length and self.jsonStr[self.pos].isdigit():
|
|
self.pos += 1
|
|
|
|
raw = self.jsonStr[start_pos:self.pos]
|
|
try:
|
|
value = float(raw) if '.' in raw or 'e' in raw.lower() else int(raw)
|
|
except ValueError:
|
|
value = raw
|
|
|
|
return Token(TokenType.NUMBER, value, start_pos, self.pos, raw)
|
|
|
|
def readKeyword(self) -> Token:
|
|
"""Read true, false, or null"""
|
|
start_pos = self.pos
|
|
|
|
for keyword, token_type in [('true', TokenType.BOOLEAN),
|
|
('false', TokenType.BOOLEAN),
|
|
('null', TokenType.NULL)]:
|
|
if self.jsonStr[self.pos:].startswith(keyword):
|
|
self.pos += len(keyword)
|
|
value = True if keyword == 'true' else (False if keyword == 'false' else None)
|
|
return Token(token_type, value, start_pos, self.pos, keyword)
|
|
|
|
# Partial keyword (truncated)
|
|
while self.pos < self.length and self.jsonStr[self.pos].isalpha():
|
|
self.pos += 1
|
|
raw = self.jsonStr[start_pos:self.pos]
|
|
return Token(TokenType.TRUNCATED, raw, start_pos, self.pos, raw)
|
|
|
|
def nextToken(self) -> Token:
|
|
"""Get the next token"""
|
|
self.skipWhitespace()
|
|
|
|
if self.pos >= self.length:
|
|
return Token(TokenType.EOF, None, self.pos, self.pos, "")
|
|
|
|
char = self.jsonStr[self.pos]
|
|
startPos = self.pos
|
|
|
|
if char == '{':
|
|
self.pos += 1
|
|
return Token(TokenType.OBJECT_START, '{', startPos, self.pos, '{')
|
|
elif char == '}':
|
|
self.pos += 1
|
|
return Token(TokenType.OBJECT_END, '}', startPos, self.pos, '}')
|
|
elif char == '[':
|
|
self.pos += 1
|
|
return Token(TokenType.ARRAY_START, '[', startPos, self.pos, '[')
|
|
elif char == ']':
|
|
self.pos += 1
|
|
return Token(TokenType.ARRAY_END, ']', startPos, self.pos, ']')
|
|
elif char == ':':
|
|
self.pos += 1
|
|
return Token(TokenType.COLON, ':', startPos, self.pos, ':')
|
|
elif char == ',':
|
|
self.pos += 1
|
|
return Token(TokenType.COMMA, ',', startPos, self.pos, ',')
|
|
elif char == '"':
|
|
return self.readString()
|
|
elif char == '-' or char.isdigit():
|
|
return self.readNumber()
|
|
elif char.isalpha():
|
|
return self.readKeyword()
|
|
else:
|
|
# Unknown character, treat as truncated
|
|
self.pos += 1
|
|
return Token(TokenType.TRUNCATED, char, startPos, self.pos, char)
|
|
|
|
|
|
@dataclass
|
|
class HierarchyLevel:
|
|
"""Represents one level in the parsed hierarchy"""
|
|
type: str # "object" or "array"
|
|
start_pos: int
|
|
end_pos: int # -1 if not closed
|
|
key: Optional[str] # Key if this is a value in an object
|
|
index: Optional[int] # Index if this is in an array
|
|
content: dict # Parsed content at this level
|
|
raw_start: str # Raw string from start to children
|
|
children_content: List[Any] # For arrays: list of parsed elements
|
|
|
|
|
|
def getJsonContinuationContext(
|
|
truncatedJson: str,
|
|
budgetLimit: Optional[int] = None,
|
|
overlapMaxChars: Optional[int] = None
|
|
) -> Tuple[str, str, str, str]:
|
|
"""
|
|
Generate continuation contexts for a truncated JSON string.
|
|
|
|
Generiert vier Kontexte für abgeschnittene JSON-Strings:
|
|
1. Overlap Context: Das innerste Objekt/Array-Element, das den Cut-Punkt enthält
|
|
2. Hierarchy Context: Die hierarchische Struktur vom Root bis zum Cut OHNE Budget-Limits (für interne Nutzung)
|
|
3. Hierarchy Context For Prompt: Die hierarchische Struktur vom Root bis zum Cut MIT Budget-Limits (für Prompts)
|
|
4. Complete Part: Der vollständige Teil des JSONs mit allen Strukturen geschlossen
|
|
|
|
Args:
|
|
truncatedJson: The truncated JSON string
|
|
budgetLimit: Character budget for data values in hierarchy context (uses BUDGET_LIMIT if None)
|
|
overlapMaxChars: Maximum characters for overlap context (uses OVERLAP_MAX_CHARS if None)
|
|
|
|
Returns:
|
|
Tuple of (overlapContext, hierarchyContext, hierarchyContextForPrompt, completePart):
|
|
- overlapContext: The innermost object/element containing the cut (for merging)
|
|
- hierarchyContext: Full structure from root to cut WITHOUT budget limitations (for internal use)
|
|
- hierarchyContextForPrompt: Full structure from root to cut WITH budget limitations (for prompts)
|
|
- completePart: Valid JSON with all structures properly closed
|
|
"""
|
|
if budgetLimit is None:
|
|
budgetLimit = BUDGET_LIMIT
|
|
if overlapMaxChars is None:
|
|
overlapMaxChars = OVERLAP_MAX_CHARS
|
|
|
|
analyzer = JsonAnalyzer(truncatedJson, budgetLimit, overlapMaxChars)
|
|
return analyzer.analyze()
|
|
|
|
|
|
@dataclass
|
|
class BudgetAllocation:
|
|
"""Tracks which nodes have been allocated budget"""
|
|
allocated_node_ids: Set[int] = field(default_factory=set)
|
|
path_node_ids: Set[int] = field(default_factory=set)
|
|
summary_mode: bool = False
|
|
|
|
|
|
class JsonAnalyzer:
|
|
"""
|
|
Analyzes truncated JSON and generates continuation contexts.
|
|
|
|
Generates three contexts for truncated JSON strings:
|
|
1. Overlap Context: The innermost object/array element containing the cut point
|
|
2. Hierarchy Context: The hierarchical structure from root to cut with budget logic
|
|
3. Complete Part: The complete part of the JSON with all structures properly closed
|
|
"""
|
|
|
|
def __init__(self, jsonStr: str, budgetLimit: Optional[int] = None, overlapMaxChars: Optional[int] = None):
|
|
self.jsonStr = jsonStr
|
|
self.budgetLimit = budgetLimit if budgetLimit is not None else BUDGET_LIMIT
|
|
self.overlapMaxChars = overlapMaxChars if overlapMaxChars is not None else OVERLAP_MAX_CHARS
|
|
self.stack: List[StackFrame] = []
|
|
self.hierarchy: List[dict] = [] # Parsed hierarchy info
|
|
|
|
def analyze(self) -> Tuple[str, str, str]:
|
|
"""
|
|
Analyze the truncated JSON and return all three contexts.
|
|
|
|
Returns:
|
|
Tuple of (overlapContext, hierarchyContext, completePart)
|
|
"""
|
|
# Parse and track the structure
|
|
self._parseStructure()
|
|
|
|
# Generate overlap context
|
|
overlapContext = self._generateOverlapContext()
|
|
|
|
# Parse structure for hierarchy (needed for both contexts)
|
|
structure = self._parseForHierarchy()
|
|
cutPos = len(self.jsonStr)
|
|
|
|
# Build both hierarchy contexts from the SAME structure BEFORE generating complete part
|
|
# CRITICAL: hierarchyContext must be the EXACT original JSON (for merge overlap detection!)
|
|
# The rendered version would have different formatting, breaking overlap matching
|
|
hierarchyContext = self.jsonStr
|
|
|
|
# Generate hierarchy context WITH budget (for prompts) - uses same structure
|
|
hierarchyContextForPrompt = self._renderWithBudgetFromStructure(structure, cutPos)
|
|
|
|
# Generate complete part (JSON with all structures closed)
|
|
completePart = self._generateCompletePart()
|
|
|
|
return overlapContext, hierarchyContext, hierarchyContextForPrompt, completePart
|
|
|
|
def _generateCompletePart(self) -> str:
|
|
"""
|
|
Generate the complete part of the JSON with all structures properly closed.
|
|
|
|
This creates valid JSON by closing all open strings, brackets/braces.
|
|
Unvollständige Keys werden entfernt, damit das Ergebnis valides JSON ist.
|
|
Unvollständige Keywords (true, false, null) werden vervollständigt.
|
|
|
|
Strategy:
|
|
1. Take the full truncated JSON
|
|
2. If we're in the middle of a string, close it
|
|
3. Complete incomplete keywords (tr → true, f → false, n → null)
|
|
4. Remove incomplete key-value pairs (keys without values)
|
|
5. Close all open brackets/braces
|
|
"""
|
|
result = self.jsonStr.rstrip()
|
|
|
|
# Remove trailing comma if present (after stripping)
|
|
if result.endswith(','):
|
|
result = result[:-1]
|
|
|
|
# Check if we need to close an open string
|
|
stringClosing = self._getStringClosing(result)
|
|
result += stringClosing
|
|
|
|
# Complete incomplete keywords (true, false, null)
|
|
result = self._completeIncompleteKeywords(result)
|
|
|
|
# Check if we're in the middle of a key (after colon)
|
|
# If string was just closed and we're after a colon with no value, remove the key
|
|
result = self._cleanIncompleteKeyValue(result)
|
|
|
|
# Close all open structures
|
|
closingBrackets = self._getClosingBrackets(result)
|
|
|
|
return result + closingBrackets
|
|
|
|
def _getStringClosing(self, jsonStr: str) -> str:
|
|
"""Check if there's an unclosed string and return closing quote if needed."""
|
|
in_string = False
|
|
escaped = False
|
|
|
|
for char in jsonStr:
|
|
if escaped:
|
|
escaped = False
|
|
continue
|
|
|
|
if char == '\\' and in_string:
|
|
escaped = True
|
|
continue
|
|
|
|
if char == '"':
|
|
in_string = not in_string
|
|
|
|
return '"' if in_string else ""
|
|
|
|
def _cleanIncompleteKeyValue(self, jsonStr: str) -> str:
|
|
"""
|
|
Clean up incomplete key-value pairs.
|
|
Handles cases like:
|
|
- {"key": "incompl -> keep (valid truncated value)
|
|
- {"key": -> remove key
|
|
- {"a": 1, "key -> remove incomplete key (was in middle of key name)
|
|
"""
|
|
stripped = jsonStr.rstrip()
|
|
|
|
# Pattern: ends with colon (possibly with whitespace) - incomplete value
|
|
if stripped.endswith(':'):
|
|
# Find the start of this key and remove the whole key-value
|
|
return self._removeLastKey(stripped)
|
|
|
|
# Check if we just closed a string that was an incomplete key
|
|
# Pattern: ..., "something" or { "something" where something has no colon after
|
|
# This happens when we close a truncated key name like "add" -> "add"
|
|
if stripped.endswith('"'):
|
|
# Look for the pattern: comma/bracket + whitespace + "string"
|
|
# and check if this was supposed to be a key
|
|
if self._isIncompleteKey(stripped):
|
|
return self._removeLastKey(stripped)
|
|
|
|
return jsonStr
|
|
|
|
def _completeIncompleteKeywords(self, jsonStr: str) -> str:
|
|
"""
|
|
Complete incomplete JSON keywords at the end of the string.
|
|
|
|
Checks the last element for incomplete keywords after colon:
|
|
- ": t*" or ": f*" or ": n*" -> complete to true/false/null
|
|
- ": " or ":" (without keyword) -> set to null
|
|
"""
|
|
result = jsonStr.rstrip()
|
|
|
|
# Find the last colon (not in string)
|
|
in_string = False
|
|
escaped = False
|
|
last_colon_pos = -1
|
|
|
|
for i in range(len(result) - 1, -1, -1):
|
|
char = result[i]
|
|
|
|
if escaped:
|
|
escaped = False
|
|
continue
|
|
|
|
if char == '\\' and in_string:
|
|
escaped = True
|
|
continue
|
|
|
|
if char == '"':
|
|
in_string = not in_string
|
|
continue
|
|
|
|
if not in_string and char == ':':
|
|
last_colon_pos = i
|
|
break
|
|
|
|
if last_colon_pos < 0:
|
|
return result
|
|
|
|
# Get text after the last colon
|
|
after_colon = result[last_colon_pos + 1:].strip()
|
|
|
|
# Check for incomplete keyword patterns
|
|
if after_colon.startswith('t') or after_colon.startswith('T'):
|
|
# Incomplete true
|
|
keyword_start = last_colon_pos + 1
|
|
# Skip whitespace
|
|
while keyword_start < len(result) and result[keyword_start] in ' \t\n\r':
|
|
keyword_start += 1
|
|
# Remove partial keyword
|
|
keyword_end = keyword_start + 1
|
|
while keyword_end < len(result) and result[keyword_end].isalpha():
|
|
keyword_end += 1
|
|
return result[:keyword_start] + 'true' + result[keyword_end:]
|
|
|
|
elif after_colon.startswith('f') or after_colon.startswith('F'):
|
|
# Incomplete false
|
|
keyword_start = last_colon_pos + 1
|
|
while keyword_start < len(result) and result[keyword_start] in ' \t\n\r':
|
|
keyword_start += 1
|
|
keyword_end = keyword_start + 1
|
|
while keyword_end < len(result) and result[keyword_end].isalpha():
|
|
keyword_end += 1
|
|
return result[:keyword_start] + 'false' + result[keyword_end:]
|
|
|
|
elif after_colon.startswith('n') or after_colon.startswith('N'):
|
|
# Incomplete null
|
|
keyword_start = last_colon_pos + 1
|
|
while keyword_start < len(result) and result[keyword_start] in ' \t\n\r':
|
|
keyword_start += 1
|
|
keyword_end = keyword_start + 1
|
|
while keyword_end < len(result) and result[keyword_end].isalpha():
|
|
keyword_end += 1
|
|
return result[:keyword_start] + 'null' + result[keyword_end:]
|
|
|
|
elif not after_colon or after_colon == '':
|
|
# No keyword after colon -> set to null
|
|
return result + 'null'
|
|
|
|
return result
|
|
|
|
def _isIncompleteKey(self, jsonStr: str) -> bool:
|
|
"""
|
|
Check if the last string in the JSON is an incomplete key in an object.
|
|
This happens when truncation occurred in the middle of a key name.
|
|
Only applies to objects, not arrays.
|
|
"""
|
|
# Find the last complete string
|
|
pos = len(jsonStr) - 1
|
|
if jsonStr[pos] != '"':
|
|
return False
|
|
|
|
# Find the opening quote of this string
|
|
stringStart = pos - 1
|
|
while stringStart >= 0:
|
|
if jsonStr[stringStart] == '"':
|
|
# Check it's not escaped
|
|
numBackslashes = 0
|
|
checkPos = stringStart - 1
|
|
while checkPos >= 0 and jsonStr[checkPos] == '\\':
|
|
numBackslashes += 1
|
|
checkPos -= 1
|
|
if numBackslashes % 2 == 0:
|
|
break
|
|
stringStart -= 1
|
|
|
|
if stringStart < 0:
|
|
return False
|
|
|
|
# Now stringStart points to opening quote
|
|
# Check what's before it (skip whitespace)
|
|
beforePos = stringStart - 1
|
|
while beforePos >= 0 and jsonStr[beforePos] in ' \t\n\r':
|
|
beforePos -= 1
|
|
|
|
if beforePos < 0:
|
|
return False
|
|
|
|
# For this to be an incomplete key, it must be preceded by { or ,
|
|
# AND we must be inside an object (not an array)
|
|
if jsonStr[beforePos] not in ',{':
|
|
return False
|
|
|
|
# Now check if we're in an object context (not array)
|
|
# Count open braces/brackets to determine context
|
|
braceCount = 0
|
|
bracketCount = 0
|
|
inString = False
|
|
|
|
for i in range(beforePos + 1):
|
|
char = jsonStr[i]
|
|
if char == '"' and (i == 0 or jsonStr[i-1] != '\\'):
|
|
inString = not inString
|
|
elif not inString:
|
|
if char == '{':
|
|
braceCount += 1
|
|
elif char == '}':
|
|
braceCount -= 1
|
|
elif char == '[':
|
|
bracketCount += 1
|
|
elif char == ']':
|
|
bracketCount -= 1
|
|
|
|
# If we have more open braces than brackets at this point,
|
|
# we're in an object context
|
|
# Actually, we need to check the innermost container
|
|
# Let's track the stack properly
|
|
stack = []
|
|
inString = False
|
|
|
|
for i in range(beforePos + 1):
|
|
char = jsonStr[i]
|
|
if char == '"' and (i == 0 or jsonStr[i-1] != '\\'):
|
|
inString = not inString
|
|
elif not inString:
|
|
if char == '{':
|
|
stack.append('object')
|
|
elif char == '[':
|
|
stack.append('array')
|
|
elif char == '}':
|
|
if stack and stack[-1] == 'object':
|
|
stack.pop()
|
|
elif char == ']':
|
|
if stack and stack[-1] == 'array':
|
|
stack.pop()
|
|
|
|
# If innermost container is an object, this is an incomplete key
|
|
return len(stack) > 0 and stack[-1] == 'object'
|
|
|
|
def _removeLastKey(self, jsonStr: str) -> str:
|
|
"""Remove the last incomplete key-value pair from the JSON string."""
|
|
stripped = jsonStr.rstrip()
|
|
|
|
# Find the last comma or opening bracket before the incomplete key
|
|
pos = len(stripped) - 1
|
|
|
|
# Skip past the current string/key
|
|
in_string = False
|
|
while pos >= 0:
|
|
char = stripped[pos]
|
|
if char == '"' and (pos == 0 or stripped[pos-1] != '\\'):
|
|
in_string = not in_string
|
|
if not in_string and char in ',{':
|
|
break
|
|
pos -= 1
|
|
|
|
if pos < 0:
|
|
return stripped
|
|
|
|
if stripped[pos] == ',':
|
|
# Remove from comma onwards
|
|
return stripped[:pos]
|
|
elif stripped[pos] == '{':
|
|
# Keep the opening brace
|
|
return stripped[:pos+1]
|
|
|
|
return stripped
|
|
|
|
def _findLastCompletePosition(self) -> int:
|
|
"""Find the position of the last complete value in the JSON."""
|
|
tokenizer = JsonTokenizer(self.jsonStr)
|
|
last_complete_pos = 0
|
|
stack_depth = 0
|
|
last_value_end = 0
|
|
in_value = False
|
|
|
|
while True:
|
|
token = tokenizer.nextToken()
|
|
|
|
if token.type == TokenType.EOF:
|
|
break
|
|
|
|
if token.type == TokenType.TRUNCATED:
|
|
# Return position before the truncated part
|
|
break
|
|
|
|
if token.type in (TokenType.OBJECT_START, TokenType.ARRAY_START):
|
|
stack_depth += 1
|
|
in_value = True
|
|
|
|
elif token.type in (TokenType.OBJECT_END, TokenType.ARRAY_END):
|
|
stack_depth -= 1
|
|
last_value_end = token.end_pos
|
|
in_value = False
|
|
|
|
elif token.type == TokenType.STRING:
|
|
# Check if this is a key or a value
|
|
saved_pos = tokenizer.pos
|
|
tokenizer.skipWhitespace()
|
|
next_char = tokenizer.peek()
|
|
tokenizer.pos = saved_pos
|
|
|
|
if next_char != ':':
|
|
# It's a value
|
|
last_value_end = token.end_pos
|
|
in_value = False
|
|
|
|
elif token.type in (TokenType.NUMBER, TokenType.BOOLEAN, TokenType.NULL):
|
|
last_value_end = token.end_pos
|
|
in_value = False
|
|
|
|
elif token.type == TokenType.COMMA:
|
|
# After a comma, we've completed a value
|
|
last_complete_pos = last_value_end
|
|
|
|
# Return the last complete position
|
|
return last_value_end if last_value_end > 0 else len(self.jsonStr)
|
|
|
|
def _getClosingBrackets(self, jsonStr: str) -> str:
|
|
"""Determine what closing brackets are needed."""
|
|
stack = []
|
|
in_string = False
|
|
escaped = False
|
|
|
|
for char in jsonStr:
|
|
if escaped:
|
|
escaped = False
|
|
continue
|
|
|
|
if char == '\\' and in_string:
|
|
escaped = True
|
|
continue
|
|
|
|
if char == '"':
|
|
in_string = not in_string
|
|
continue
|
|
|
|
if in_string:
|
|
continue
|
|
|
|
if char == '{':
|
|
stack.append('}')
|
|
elif char == '[':
|
|
stack.append(']')
|
|
elif char == '}':
|
|
if stack and stack[-1] == '}':
|
|
stack.pop()
|
|
elif char == ']':
|
|
if stack and stack[-1] == ']':
|
|
stack.pop()
|
|
|
|
# Return closing brackets in reverse order
|
|
return ''.join(reversed(stack))
|
|
|
|
def _parseStructure(self):
|
|
"""Parse the JSON structure and track hierarchy"""
|
|
tokenizer = JsonTokenizer(self.jsonStr)
|
|
|
|
while True:
|
|
token = tokenizer.nextToken()
|
|
|
|
if token.type == TokenType.EOF or token.type == TokenType.TRUNCATED:
|
|
break
|
|
|
|
if token.type == TokenType.OBJECT_START:
|
|
frame = StackFrame(
|
|
type="object",
|
|
start_pos=token.start_pos,
|
|
keys_seen=[]
|
|
)
|
|
self.stack.append(frame)
|
|
|
|
elif token.type == TokenType.ARRAY_START:
|
|
frame = StackFrame(
|
|
type="array",
|
|
start_pos=token.start_pos,
|
|
index=0
|
|
)
|
|
self.stack.append(frame)
|
|
|
|
elif token.type == TokenType.OBJECT_END:
|
|
if self.stack and self.stack[-1].type == "object":
|
|
self.stack.pop()
|
|
|
|
elif token.type == TokenType.ARRAY_END:
|
|
if self.stack and self.stack[-1].type == "array":
|
|
self.stack.pop()
|
|
|
|
elif token.type == TokenType.STRING:
|
|
# Could be a key or a value
|
|
self._handleStringToken(token, tokenizer)
|
|
|
|
elif token.type == TokenType.COMMA:
|
|
# Increment array index
|
|
if self.stack and self.stack[-1].type == "array":
|
|
self.stack[-1].index += 1
|
|
|
|
def _handleStringToken(self, token: Token, tokenizer: JsonTokenizer):
|
|
"""Handle a string token (could be key or value)"""
|
|
if self.stack and self.stack[-1].type == "object":
|
|
# Check if this is a key (followed by colon)
|
|
saved_pos = tokenizer.pos
|
|
tokenizer.skipWhitespace()
|
|
next_char = tokenizer.peek()
|
|
|
|
if next_char == ':':
|
|
# This is a key
|
|
self.stack[-1].key = token.value
|
|
self.stack[-1].keys_seen.append(token.value)
|
|
|
|
tokenizer.pos = saved_pos
|
|
|
|
def _generateOverlapContext(self) -> str:
|
|
"""
|
|
Generate the overlap context - the innermost object/array element containing the cut.
|
|
|
|
Returns the raw string from the start of that element to the end of the truncated JSON.
|
|
Dieser Kontext wird verwendet, um den abgeschnittenen Teil mit dem neuen Teil zu mergen.
|
|
Exakt so wie im Original-String (für String-Matching beim Merge).
|
|
|
|
SPECIAL CASE: If cut point is within a list element, return the entire list object (from opening bracket).
|
|
"""
|
|
if not self.stack:
|
|
# No structure, return last overlap_max_chars characters
|
|
return self.jsonStr[-self.overlapMaxChars:]
|
|
|
|
# Find the innermost container that should be the overlap
|
|
innermost = self.stack[-1]
|
|
|
|
# SPECIAL CASE: If innermost is an array, return the entire array (from opening bracket)
|
|
if innermost.type == "array":
|
|
overlap_start = innermost.start_pos
|
|
else:
|
|
# For objects, use the standard logic
|
|
overlap_start = self._findInnermostElementStart()
|
|
|
|
overlap = self.jsonStr[overlap_start:]
|
|
|
|
# Apply max chars limit
|
|
if len(overlap) > self.overlapMaxChars:
|
|
overlap = self.jsonStr[-self.overlapMaxChars:]
|
|
|
|
return overlap
|
|
|
|
def _findAllArrayElementStarts(self, arrayFrame: StackFrame) -> List[int]:
|
|
"""Find all element start positions in an array"""
|
|
arrayContent = self.jsonStr[arrayFrame.start_pos:]
|
|
|
|
# Skip the opening bracket and whitespace
|
|
pos = 1
|
|
while pos < len(arrayContent) and arrayContent[pos] in ' \t\n\r':
|
|
pos += 1
|
|
|
|
elementStarts = [arrayFrame.start_pos + pos]
|
|
depth = 0
|
|
inString = False
|
|
escaped = False
|
|
|
|
i = pos
|
|
while i < len(arrayContent):
|
|
char = arrayContent[i]
|
|
|
|
if escaped:
|
|
escaped = False
|
|
i += 1
|
|
continue
|
|
|
|
if char == '\\' and inString:
|
|
escaped = True
|
|
i += 1
|
|
continue
|
|
|
|
if char == '"':
|
|
inString = not inString
|
|
i += 1
|
|
continue
|
|
|
|
if inString:
|
|
i += 1
|
|
continue
|
|
|
|
if char in '{[':
|
|
depth += 1
|
|
elif char in '}]':
|
|
depth -= 1
|
|
elif char == ',' and depth == 0:
|
|
# Found element boundary
|
|
i += 1
|
|
# Skip whitespace
|
|
while i < len(arrayContent) and arrayContent[i] in ' \t\n\r':
|
|
i += 1
|
|
elementStarts.append(arrayFrame.start_pos + i)
|
|
|
|
i += 1
|
|
|
|
return elementStarts
|
|
|
|
def _findInnermostElementStart(self) -> int:
|
|
"""Find the start position of the innermost element for overlap"""
|
|
if not self.stack:
|
|
return max(0, len(self.jsonStr) - self.overlapMaxChars)
|
|
|
|
# Walk through stack to find the innermost array element or object
|
|
# We want the innermost "atomic" unit that contains the cut
|
|
|
|
# Strategy:
|
|
# - If innermost is an object: return its start
|
|
# - If innermost is an array:
|
|
# - If current element is an object/array: return start of that element
|
|
# - If current element is a primitive: return start of array or last N chars
|
|
|
|
innermost = self.stack[-1]
|
|
|
|
if innermost.type == "object":
|
|
return innermost.start_pos
|
|
else:
|
|
# It's an array - find the start of the current element
|
|
element_start = self._findArrayElementStart(innermost)
|
|
|
|
# Check if the element is a primitive or complex type
|
|
element_content = self.jsonStr[element_start:].strip()
|
|
|
|
# If it starts with { or [ it's complex, return the element start
|
|
if element_content and element_content[0] in '{[':
|
|
return element_start
|
|
else:
|
|
# Primitive in array - check if there's a parent object
|
|
# or return overlap_max_chars from end
|
|
for i in range(len(self.stack) - 2, -1, -1):
|
|
if self.stack[i].type == "object":
|
|
return self.stack[i].start_pos
|
|
|
|
# No parent object, return max chars from end
|
|
return max(0, len(self.jsonStr) - self.overlapMaxChars)
|
|
|
|
def _findArrayElementStart(self, arrayFrame: StackFrame) -> int:
|
|
"""Find the start position of the current array element"""
|
|
# We need to find the start of the current element in the array
|
|
# Parse from array start to find element boundaries
|
|
|
|
arrayContent = self.jsonStr[arrayFrame.start_pos:]
|
|
|
|
# Skip the opening bracket and whitespace
|
|
pos = 1
|
|
while pos < len(arrayContent) and arrayContent[pos] in ' \t\n\r':
|
|
pos += 1
|
|
|
|
elementStarts = [arrayFrame.start_pos + pos]
|
|
depth = 0
|
|
inString = False
|
|
escaped = False
|
|
|
|
i = pos
|
|
while i < len(arrayContent):
|
|
char = arrayContent[i]
|
|
|
|
if escaped:
|
|
escaped = False
|
|
i += 1
|
|
continue
|
|
|
|
if char == '\\' and inString:
|
|
escaped = True
|
|
i += 1
|
|
continue
|
|
|
|
if char == '"':
|
|
inString = not inString
|
|
i += 1
|
|
continue
|
|
|
|
if inString:
|
|
i += 1
|
|
continue
|
|
|
|
if char in '{[':
|
|
depth += 1
|
|
elif char in '}]':
|
|
depth -= 1
|
|
elif char == ',' and depth == 0:
|
|
# Found element boundary
|
|
i += 1
|
|
# Skip whitespace
|
|
while i < len(arrayContent) and arrayContent[i] in ' \t\n\r':
|
|
i += 1
|
|
elementStarts.append(arrayFrame.start_pos + i)
|
|
|
|
i += 1
|
|
|
|
# Return the start of the current element
|
|
if arrayFrame.index < len(elementStarts):
|
|
return elementStarts[arrayFrame.index]
|
|
elif elementStarts:
|
|
return elementStarts[-1]
|
|
else:
|
|
return arrayFrame.start_pos
|
|
|
|
def _generateHierarchyContext(self) -> str:
|
|
"""
|
|
Generate the hierarchy context with budget logic.
|
|
Shows structure from root to cut point with data values limited by budget.
|
|
"""
|
|
if not self.stack:
|
|
# No structure
|
|
return self.jsonStr[-self.overlapMaxChars:]
|
|
|
|
# We need to rebuild the JSON with budget logic
|
|
# Priority: elements closer to cut get full values, distant ones get "..."
|
|
|
|
return self._rebuildWithBudget()
|
|
|
|
def _rebuildWithBudget(self) -> str:
|
|
"""Rebuild JSON from root to cut with budget constraints"""
|
|
|
|
# Strategy:
|
|
# 1. Parse the JSON structure tracking all values
|
|
# 2. Calculate total value size
|
|
# 3. Apply budget from cut backwards
|
|
# 4. Render with "..." for values outside budget
|
|
|
|
# First, get a structured representation
|
|
structure = self._parseForHierarchy()
|
|
|
|
# Now render with budget
|
|
return self._renderWithBudget(structure)
|
|
|
|
def _parseForHierarchy(self) -> dict:
|
|
"""Parse JSON into a structure suitable for hierarchy rendering"""
|
|
|
|
result = {
|
|
'type': 'root',
|
|
'children': [],
|
|
'raw_positions': []
|
|
}
|
|
|
|
tokenizer = JsonTokenizer(self.jsonStr)
|
|
stack = [result]
|
|
current_key = None
|
|
|
|
while True:
|
|
token = tokenizer.nextToken()
|
|
|
|
if token.type == TokenType.EOF:
|
|
break
|
|
|
|
if token.type == TokenType.TRUNCATED:
|
|
# Mark the truncation point
|
|
if stack:
|
|
current = stack[-1]
|
|
if current.get('type') == 'object':
|
|
if current_key:
|
|
current['children'].append({
|
|
'type': 'truncated_value',
|
|
'key': current_key,
|
|
'raw': self.jsonStr[token.start_pos:],
|
|
'start_pos': token.start_pos
|
|
})
|
|
elif current.get('type') == 'array':
|
|
current['children'].append({
|
|
'type': 'truncated_value',
|
|
'raw': self.jsonStr[token.start_pos:],
|
|
'start_pos': token.start_pos
|
|
})
|
|
break
|
|
|
|
if token.type == TokenType.OBJECT_START:
|
|
obj = {
|
|
'type': 'object',
|
|
'key': current_key,
|
|
'children': [],
|
|
'start_pos': token.start_pos
|
|
}
|
|
if stack:
|
|
stack[-1]['children'].append(obj)
|
|
stack.append(obj)
|
|
current_key = None
|
|
|
|
elif token.type == TokenType.ARRAY_START:
|
|
arr = {
|
|
'type': 'array',
|
|
'key': current_key,
|
|
'children': [],
|
|
'start_pos': token.start_pos
|
|
}
|
|
if stack:
|
|
stack[-1]['children'].append(arr)
|
|
stack.append(arr)
|
|
current_key = None
|
|
|
|
elif token.type == TokenType.OBJECT_END:
|
|
if len(stack) > 1 and stack[-1].get('type') == 'object':
|
|
stack[-1]['end_pos'] = token.end_pos
|
|
stack[-1]['complete'] = True
|
|
stack.pop()
|
|
|
|
elif token.type == TokenType.ARRAY_END:
|
|
if len(stack) > 1 and stack[-1].get('type') == 'array':
|
|
stack[-1]['end_pos'] = token.end_pos
|
|
stack[-1]['complete'] = True
|
|
stack.pop()
|
|
|
|
elif token.type == TokenType.STRING:
|
|
# Check if it's a key
|
|
saved_pos = tokenizer.pos
|
|
tokenizer.skipWhitespace()
|
|
next_char = tokenizer.peek()
|
|
|
|
if next_char == ':' and stack and stack[-1].get('type') == 'object':
|
|
current_key = token.value
|
|
else:
|
|
# It's a value
|
|
value_node = {
|
|
'type': 'value',
|
|
'key': current_key,
|
|
'value': token.value,
|
|
'raw': token.raw,
|
|
'start_pos': token.start_pos,
|
|
'end_pos': token.end_pos,
|
|
'value_type': 'string'
|
|
}
|
|
if stack:
|
|
stack[-1]['children'].append(value_node)
|
|
current_key = None
|
|
|
|
tokenizer.pos = saved_pos
|
|
|
|
elif token.type in (TokenType.NUMBER, TokenType.BOOLEAN, TokenType.NULL):
|
|
value_node = {
|
|
'type': 'value',
|
|
'key': current_key,
|
|
'value': token.value,
|
|
'raw': token.raw,
|
|
'start_pos': token.start_pos,
|
|
'end_pos': token.end_pos,
|
|
'value_type': str(token.type.value)
|
|
}
|
|
if stack:
|
|
stack[-1]['children'].append(value_node)
|
|
current_key = None
|
|
|
|
return result
|
|
|
|
def _renderWithBudget(self, structure: dict) -> str:
|
|
"""Render the structure with budget constraints"""
|
|
|
|
# First, collect all value nodes with their distances from cut
|
|
cutPos = len(self.jsonStr)
|
|
allValues = self._collectValuesWithDistance(structure, cutPos)
|
|
|
|
# Sort by distance (closest to cut first)
|
|
allValues.sort(key=lambda x: x['distance'])
|
|
|
|
# Determine which values get full rendering
|
|
budgetRemaining = self.budgetLimit
|
|
valuesWithBudget = set()
|
|
|
|
for valInfo in allValues:
|
|
valSize = len(str(valInfo['raw']))
|
|
if budgetRemaining >= valSize:
|
|
valuesWithBudget.add(valInfo['id'])
|
|
budgetRemaining -= valSize
|
|
|
|
# Now render the structure
|
|
return self._renderNode(structure, valuesWithBudget, indent=0)
|
|
|
|
def _collectValuesWithDistance(self, node: dict, cutPos: int, depth: int = 0) -> list:
|
|
"""Collect all value nodes with their distance from cut point"""
|
|
values = []
|
|
|
|
if node.get('type') == 'value':
|
|
endPos = node.get('end_pos', cutPos)
|
|
distance = cutPos - endPos
|
|
values.append({
|
|
'id': id(node),
|
|
'node': node,
|
|
'distance': distance,
|
|
'raw': node.get('raw', ''),
|
|
'depth': depth
|
|
})
|
|
elif node.get('type') == 'truncated_value':
|
|
values.append({
|
|
'id': id(node),
|
|
'node': node,
|
|
'distance': 0, # Truncated values are at the cut
|
|
'raw': node.get('raw', ''),
|
|
'depth': depth
|
|
})
|
|
|
|
for child in node.get('children', []):
|
|
values.extend(self._collectValuesWithDistance(child, cutPos, depth + 1))
|
|
|
|
return values
|
|
|
|
def _renderNode(self, node: dict, valuesWithBudget: set, indent: int = 0) -> str:
|
|
"""Render a node with budget constraints"""
|
|
indent_str = " " * indent
|
|
|
|
node_type = node.get('type')
|
|
|
|
if node_type == 'root':
|
|
parts = []
|
|
for child in node.get('children', []):
|
|
parts.append(self._renderNode(child, valuesWithBudget, indent))
|
|
return '\n'.join(parts)
|
|
|
|
elif node_type == 'object':
|
|
return self._renderObject(node, valuesWithBudget, indent)
|
|
|
|
elif node_type == 'array':
|
|
return self._renderArray(node, valuesWithBudget, indent)
|
|
|
|
elif node_type == 'value':
|
|
return self._renderValue(node, valuesWithBudget, indent)
|
|
|
|
elif node_type == 'truncated_value':
|
|
return node.get('raw', '')
|
|
|
|
return ''
|
|
|
|
def _renderObject(self, node: dict, valuesWithBudget: set, indent: int) -> str:
|
|
"""Render an object node"""
|
|
indent_str = " " * indent
|
|
inner_indent = " " * (indent + 1)
|
|
|
|
key_prefix = ""
|
|
if node.get('key'):
|
|
key_prefix = f'"{node["key"]}": '
|
|
|
|
if not node.get('children'):
|
|
if node.get('complete'):
|
|
return f"{key_prefix}{{}}"
|
|
else:
|
|
return f"{key_prefix}{{"
|
|
|
|
parts = [f"{key_prefix}{{"]
|
|
|
|
children = node.get('children', [])
|
|
for i, child in enumerate(children):
|
|
child_rendered = self._renderNode(child, valuesWithBudget, indent + 1)
|
|
|
|
# Add comma if not last and next sibling exists
|
|
if i < len(children) - 1:
|
|
if child.get('type') != 'truncated_value':
|
|
parts.append(f"{inner_indent}{child_rendered},")
|
|
else:
|
|
parts.append(f"{inner_indent}{child_rendered}")
|
|
else:
|
|
parts.append(f"{inner_indent}{child_rendered}")
|
|
|
|
if node.get('complete'):
|
|
parts.append(f"{indent_str}}}")
|
|
|
|
return '\n'.join(parts)
|
|
|
|
def _renderArray(self, node: dict, valuesWithBudget: set, indent: int) -> str:
|
|
"""Render an array node"""
|
|
indent_str = " " * indent
|
|
inner_indent = " " * (indent + 1)
|
|
|
|
key_prefix = ""
|
|
if node.get('key'):
|
|
key_prefix = f'"{node["key"]}": '
|
|
|
|
if not node.get('children'):
|
|
if node.get('complete'):
|
|
return f"{key_prefix}[]"
|
|
else:
|
|
return f"{key_prefix}["
|
|
|
|
parts = [f"{key_prefix}["]
|
|
|
|
children = node.get('children', [])
|
|
for i, child in enumerate(children):
|
|
child_rendered = self._renderNode(child, valuesWithBudget, indent + 1)
|
|
|
|
if i < len(children) - 1:
|
|
if child.get('type') != 'truncated_value':
|
|
parts.append(f"{inner_indent}{child_rendered},")
|
|
else:
|
|
parts.append(f"{inner_indent}{child_rendered}")
|
|
else:
|
|
parts.append(f"{inner_indent}{child_rendered}")
|
|
|
|
if node.get('complete'):
|
|
parts.append(f"{indent_str}]")
|
|
|
|
return '\n'.join(parts)
|
|
|
|
def _renderValue(self, node: dict, valuesWithBudget: set, indent: int) -> str:
|
|
"""Render a value node"""
|
|
key_prefix = ""
|
|
if node.get('key'):
|
|
key_prefix = f'"{node["key"]}": '
|
|
|
|
if id(node) in valuesWithBudget:
|
|
# Full value
|
|
default_raw = '"...\"'
|
|
raw_value = node.get('raw', default_raw)
|
|
return f"{key_prefix}{raw_value}"
|
|
else:
|
|
# Placeholder
|
|
return f'{key_prefix}"..."'
|
|
|
|
def _renderFromStructure(self, structure: dict) -> str:
|
|
"""Render full structure without budget constraints - all values shown"""
|
|
# Use V3 renderer with all nodes allocated (no budget constraints)
|
|
allNodeIds = set()
|
|
self._collectAllNodeIds(structure, allNodeIds)
|
|
|
|
emptyAllocation = BudgetAllocation(
|
|
allocated_node_ids=allNodeIds,
|
|
path_node_ids=set(),
|
|
summary_mode=False
|
|
)
|
|
return self._renderNodeV3(structure, 0, emptyAllocation)
|
|
|
|
def _collectAllNodeIds(self, node: dict, result: set):
|
|
"""Collect all node IDs for unlimited rendering"""
|
|
result.add(id(node))
|
|
for child in node.get('children', []):
|
|
self._collectAllNodeIds(child, result)
|
|
|
|
def _renderWithBudgetFromStructure(self, structure: dict, cutPos: int) -> str:
|
|
"""
|
|
Render structure with budget logic - allocate from CUT to ROOT.
|
|
|
|
ALGORITHM:
|
|
|
|
Phase 1: Build path from cut to root
|
|
- Find the cut element (truncated value or deepest incomplete node)
|
|
- Build ordered path: [cut_element, parent, grandparent, ..., root]
|
|
|
|
Phase 2: Allocate budget
|
|
- Collect ALL value nodes with their distance to cut
|
|
- Sort by distance (smaller = closer to cut = higher priority)
|
|
- Allocate budget to values in this order
|
|
- When budget < 50: enable summary_mode (affects containers only)
|
|
|
|
Phase 3: Render
|
|
- PATH containers: always render structure
|
|
- NON-PATH containers in summary_mode: render as <object>/<array>
|
|
- Values: render if allocated, else type hint
|
|
|
|
Returns:
|
|
Rendered JSON string with budget constraints applied
|
|
"""
|
|
# Phase 1: Build path from cut to root
|
|
pathFromCutToRoot = []
|
|
self._buildPathFromCutToRootV3(structure, cutPos, [], pathFromCutToRoot)
|
|
|
|
pathNodeIds = set(id(node) for node in pathFromCutToRoot)
|
|
|
|
# Phase 2: Collect ALL values and allocate budget
|
|
allValues = []
|
|
self._collectAllValuesWithDistance(structure, cutPos, allValues)
|
|
|
|
# Sort by distance (smaller = closer to cut = higher priority)
|
|
allValues.sort(key=lambda x: x['distance'])
|
|
|
|
# Initialize allocation tracker
|
|
allocation = BudgetAllocation(
|
|
path_node_ids=pathNodeIds,
|
|
allocated_node_ids=set(),
|
|
summary_mode=False
|
|
)
|
|
|
|
remainingBudget = self.budgetLimit
|
|
|
|
# Phase 2a: Allocate PATH values first (truncated values are always rendered)
|
|
pathValues = [item for item in allValues if id(item['node']) in pathNodeIds]
|
|
for item in pathValues:
|
|
node = item['node']
|
|
nodeType = node.get('type')
|
|
|
|
if nodeType == 'truncated_value':
|
|
allocation.allocated_node_ids.add(id(node))
|
|
continue
|
|
|
|
if nodeType != 'value':
|
|
continue
|
|
|
|
rawValue = node.get('raw', '')
|
|
valueSize = len(rawValue)
|
|
|
|
if valueSize <= remainingBudget:
|
|
allocation.allocated_node_ids.add(id(node))
|
|
remainingBudget -= valueSize
|
|
|
|
if remainingBudget < 50:
|
|
allocation.summary_mode = True
|
|
|
|
# Phase 2b: Allocate NON-PATH values (skip if path already triggered summary mode)
|
|
if not allocation.summary_mode:
|
|
nonPathValues = [item for item in allValues if id(item['node']) not in pathNodeIds]
|
|
for item in nonPathValues:
|
|
node = item['node']
|
|
nodeType = node.get('type')
|
|
|
|
if nodeType != 'value':
|
|
continue
|
|
|
|
rawValue = node.get('raw', '')
|
|
valueSize = len(rawValue)
|
|
|
|
if valueSize <= remainingBudget:
|
|
allocation.allocated_node_ids.add(id(node))
|
|
remainingBudget -= valueSize
|
|
|
|
if remainingBudget < 50 and not allocation.summary_mode:
|
|
allocation.summary_mode = True
|
|
|
|
# Phase 3: Render with allocation info
|
|
return self._renderNodeV3(structure, 0, allocation)
|
|
|
|
def _buildPathFromCutToRootV3(self, node: dict, cutPos: int, currentPath: list, resultPath: list) -> bool:
|
|
"""
|
|
Recursively find the path from root to cut element, then reverse it.
|
|
Result path is ordered: [cut_element, parent, ..., root]
|
|
"""
|
|
nodeType = node.get('type')
|
|
startPos = node.get('start_pos', 0)
|
|
endPos = node.get('end_pos', cutPos + 1)
|
|
|
|
pathWithCurrent = currentPath + [node]
|
|
|
|
for child in node.get('children', []):
|
|
if self._buildPathFromCutToRootV3(child, cutPos, pathWithCurrent, resultPath):
|
|
return True
|
|
|
|
if nodeType == 'truncated_value':
|
|
resultPath.clear()
|
|
resultPath.extend(reversed(pathWithCurrent))
|
|
return True
|
|
|
|
if nodeType == 'value' and startPos <= cutPos <= endPos:
|
|
resultPath.clear()
|
|
resultPath.extend(reversed(pathWithCurrent))
|
|
return True
|
|
|
|
if nodeType in ('object', 'array') and not node.get('complete') and startPos <= cutPos:
|
|
resultPath.clear()
|
|
resultPath.extend(reversed(pathWithCurrent))
|
|
return True
|
|
|
|
if nodeType == 'root' and not resultPath:
|
|
resultPath.clear()
|
|
resultPath.extend(reversed(pathWithCurrent))
|
|
return True
|
|
|
|
return False
|
|
|
|
def _collectAllValuesWithDistance(self, node: dict, cutPos: int, result: list, depth: int = 0):
|
|
"""Collect ALL value nodes with their distance to cut point."""
|
|
nodeType = node.get('type')
|
|
|
|
if nodeType in ('value', 'truncated_value'):
|
|
endPos = node.get('end_pos', cutPos)
|
|
distance = cutPos - endPos
|
|
result.append({
|
|
'node': node,
|
|
'distance': distance,
|
|
'depth': depth
|
|
})
|
|
|
|
for child in node.get('children', []):
|
|
self._collectAllValuesWithDistance(child, cutPos, result, depth + 1)
|
|
|
|
def _renderNodeV3(self, node: dict, depth: int, allocation: BudgetAllocation) -> str:
|
|
"""Render a node with budget allocation info."""
|
|
nodeType = node.get('type')
|
|
|
|
if nodeType == 'root':
|
|
parts = []
|
|
for child in node.get('children', []):
|
|
parts.append(self._renderNodeV3(child, depth, allocation))
|
|
return '\n'.join(parts)
|
|
|
|
elif nodeType == 'object':
|
|
return self._renderObjectV3(node, depth, allocation)
|
|
|
|
elif nodeType == 'array':
|
|
return self._renderArrayV3(node, depth, allocation)
|
|
|
|
elif nodeType == 'value':
|
|
return self._renderValueV3(node, depth, allocation)
|
|
|
|
elif nodeType == 'truncated_value':
|
|
keyPrefix = f'"{node.get("key")}": ' if node.get('key') else ''
|
|
return f"{keyPrefix}{node.get('raw', '')}"
|
|
|
|
return ''
|
|
|
|
def _renderObjectV3(self, node: dict, depth: int, allocation: BudgetAllocation) -> str:
|
|
"""Render object - summary mode non-path objects become <object>."""
|
|
indentStr = " " * depth
|
|
innerIndent = " " * (depth + 1)
|
|
|
|
keyPrefix = f'"{node.get("key")}": ' if node.get('key') else ''
|
|
children = node.get('children', [])
|
|
isOnPath = id(node) in allocation.path_node_ids
|
|
|
|
if allocation.summary_mode and not isOnPath:
|
|
return f"{keyPrefix}<object>"
|
|
|
|
# If object is incomplete and cut is directly here (no incomplete child),
|
|
# extract exact string from original JSON to preserve formatting
|
|
if not node.get('complete') and node.get('start_pos') is not None:
|
|
hasIncompleteChild = any(
|
|
child.get('type') in ('object', 'array') and not child.get('complete')
|
|
for child in children
|
|
)
|
|
if not hasIncompleteChild:
|
|
return self.jsonStr[node.get('start_pos'):]
|
|
|
|
if not children:
|
|
return f"{keyPrefix}{{}}" if node.get('complete') else f"{keyPrefix}{{"
|
|
|
|
parts = [f"{keyPrefix}{{"]
|
|
|
|
for i, child in enumerate(children):
|
|
childRendered = self._renderNodeV3(child, depth + 1, allocation)
|
|
isLast = (i == len(children) - 1)
|
|
isTruncated = child.get('type') == 'truncated_value'
|
|
|
|
if isLast or isTruncated:
|
|
parts.append(f"{innerIndent}{childRendered}")
|
|
else:
|
|
parts.append(f"{innerIndent}{childRendered},")
|
|
|
|
if node.get('complete'):
|
|
parts.append(f"{indentStr}}}")
|
|
|
|
return '\n'.join(parts)
|
|
|
|
def _renderArrayV3(self, node: dict, depth: int, allocation: BudgetAllocation) -> str:
|
|
"""Render array - summary mode non-path arrays become <array>.
|
|
|
|
For arrays ON the path with many children, show:
|
|
- First few children (for context)
|
|
- ... (N items omitted) ...
|
|
- Last N children (closest to cut point)
|
|
"""
|
|
indentStr = " " * depth
|
|
innerIndent = " " * (depth + 1)
|
|
|
|
keyPrefix = f'"{node.get("key")}": ' if node.get('key') else ''
|
|
children = node.get('children', [])
|
|
isOnPath = id(node) in allocation.path_node_ids
|
|
|
|
if allocation.summary_mode and not isOnPath:
|
|
return f"{keyPrefix}<array>"
|
|
|
|
# If array is incomplete and cut is directly here (no incomplete child),
|
|
# extract exact string from original JSON to preserve formatting
|
|
if not node.get('complete') and node.get('start_pos') is not None:
|
|
hasIncompleteChild = any(
|
|
child.get('type') in ('object', 'array') and not child.get('complete')
|
|
for child in children
|
|
)
|
|
if not hasIncompleteChild:
|
|
return self.jsonStr[node.get('start_pos'):]
|
|
|
|
if not children:
|
|
return f"{keyPrefix}[]" if node.get('complete') else f"{keyPrefix}["
|
|
|
|
parts = [f"{keyPrefix}["]
|
|
|
|
# For arrays ON PATH with many children (e.g. table rows):
|
|
# Show first 3, then "...", then last N children (from bottom up, using budget)
|
|
# This ensures we see context near the cut point
|
|
if isOnPath and len(children) > 10 and allocation.summary_mode:
|
|
showFirst = 3 # Show first 3 for context
|
|
# Calculate how many from the end we can show within budget
|
|
# Estimate ~80 chars per row for tables
|
|
estimatedCharsPerChild = 80
|
|
budgetForEnd = max(500, self.budgetLimit // 2) # Use half budget for end children
|
|
showLast = max(5, budgetForEnd // estimatedCharsPerChild)
|
|
showLast = min(showLast, len(children) - showFirst - 1) # Don't overlap with first
|
|
|
|
# Create a modified allocation that includes these children on path
|
|
# so they don't get rendered as <array>
|
|
childrenToShow = set()
|
|
for i in range(min(showFirst, len(children))):
|
|
childrenToShow.add(id(children[i]))
|
|
startIdx = len(children) - showLast
|
|
for i in range(startIdx, len(children)):
|
|
childrenToShow.add(id(children[i]))
|
|
|
|
# Temporarily add children to path_node_ids
|
|
originalPathIds = allocation.path_node_ids
|
|
extendedPathIds = originalPathIds | childrenToShow
|
|
allocation.path_node_ids = extendedPathIds
|
|
|
|
# Render first N children
|
|
for i in range(min(showFirst, len(children))):
|
|
child = children[i]
|
|
childRendered = self._renderNodeV3(child, depth + 1, allocation)
|
|
parts.append(f"{innerIndent}{childRendered},")
|
|
|
|
# Add ellipsis if there are omitted items
|
|
omittedCount = len(children) - showFirst - showLast
|
|
if omittedCount > 0:
|
|
parts.append(f"{innerIndent}// ... ({omittedCount} items omitted) ...")
|
|
|
|
# Render last N children (closest to cut)
|
|
for i in range(startIdx, len(children)):
|
|
child = children[i]
|
|
childRendered = self._renderNodeV3(child, depth + 1, allocation)
|
|
isLast = (i == len(children) - 1)
|
|
isTruncated = child.get('type') == 'truncated_value'
|
|
|
|
if isLast or isTruncated:
|
|
parts.append(f"{innerIndent}{childRendered}")
|
|
else:
|
|
parts.append(f"{innerIndent}{childRendered},")
|
|
|
|
# Restore original path_node_ids
|
|
allocation.path_node_ids = originalPathIds
|
|
else:
|
|
# Standard rendering for small arrays or non-path arrays
|
|
for i, child in enumerate(children):
|
|
childRendered = self._renderNodeV3(child, depth + 1, allocation)
|
|
isLast = (i == len(children) - 1)
|
|
isTruncated = child.get('type') == 'truncated_value'
|
|
|
|
if isLast or isTruncated:
|
|
parts.append(f"{innerIndent}{childRendered}")
|
|
else:
|
|
parts.append(f"{innerIndent}{childRendered},")
|
|
|
|
if node.get('complete'):
|
|
parts.append(f"{indentStr}]")
|
|
|
|
return '\n'.join(parts)
|
|
|
|
def _renderValueV3(self, node: dict, depth: int, allocation: BudgetAllocation) -> str:
|
|
"""Render value - if allocated render full, else type hint."""
|
|
keyPrefix = f'"{node.get("key")}": ' if node.get('key') else ''
|
|
rawValue = node.get('raw', '""')
|
|
valueType = node.get('value_type', 'string')
|
|
|
|
typeHints = {
|
|
'string': '<str>',
|
|
'number': '<number>',
|
|
'boolean': '<boolean>',
|
|
'null': '<null>'
|
|
}
|
|
typeHint = typeHints.get(valueType, '<value>')
|
|
|
|
if id(node) in allocation.allocated_node_ids:
|
|
return f"{keyPrefix}{rawValue}"
|
|
else:
|
|
return f"{keyPrefix}{typeHint}"
|
|
|
|
def _calculateDistancesForBudget(self, node: dict, cutPos: int):
|
|
"""Calculate distance from cut point for each value node"""
|
|
if node.get('type') == 'value':
|
|
endPos = node.get('end_pos', cutPos)
|
|
node['distance'] = cutPos - endPos
|
|
elif node.get('type') == 'truncated_value':
|
|
node['distance'] = 0 # At cut point
|
|
else:
|
|
for child in node.get('children', []):
|
|
self._calculateDistancesForBudget(child, cutPos)
|
|
|
|
def _collectValuesWithDistance(self, node: dict, values: list, cutPos: int):
|
|
"""Collect all value nodes with their distance"""
|
|
if node.get('type') == 'value':
|
|
values.append({
|
|
'node': node,
|
|
'distance': node.get('distance', cutPos),
|
|
'raw': node.get('raw', '')
|
|
})
|
|
for child in node.get('children', []):
|
|
self._collectValuesWithDistance(child, values, cutPos)
|
|
|
|
def _isSiblingOf(self, node: dict, other: dict, structure: dict) -> bool:
|
|
"""Check if two nodes are siblings (same parent)"""
|
|
# This is a simplified check - in practice we'd need parent tracking
|
|
# For now, assume nodes at same depth with same parent are siblings
|
|
return False # TODO: implement proper sibling detection if needed
|
|
|
|
def _collectCompleteValues(self, node: dict) -> list:
|
|
"""Collect all complete (non-truncated) value nodes (strings, numbers, booleans, null)"""
|
|
values = []
|
|
|
|
# Collect all value types, not just strings (needed for arrays of numbers)
|
|
if node.get('type') == 'value':
|
|
values.append({
|
|
'start_pos': node['start_pos'],
|
|
'end_pos': node['end_pos'],
|
|
'raw': node['raw'],
|
|
'key': node.get('key')
|
|
})
|
|
|
|
for child in node.get('children', []):
|
|
values.extend(self._collectCompleteValues(child))
|
|
|
|
return values
|
|
|
|
|
|
def extractContinuationContexts(
|
|
truncatedJson: str
|
|
) -> Tuple[str, str, str]:
|
|
"""
|
|
Main entry point: Extract all three continuation contexts from a truncated JSON.
|
|
|
|
Generiert drei Kontexte für abgeschnittene JSON-Strings:
|
|
1. Overlap Context: Das innerste Objekt/Array-Element, das den Cut-Punkt enthält
|
|
- Wird verwendet, um den abgeschnittenen Teil mit dem neuen Teil zu mergen
|
|
- Exakt so wie im Original-String (für String-Matching beim Merge)
|
|
|
|
2. Hierarchy Context: Die hierarchische Struktur vom Root bis zum Cut-Punkt
|
|
- Mit Budget-Logik: Näher am Cut = vollständige Werte, weiter weg = "..." Platzhalter
|
|
- Gibt der AI den Kontext der gesamten JSON-Struktur
|
|
|
|
3. Complete Part: Der vollständige, valide JSON bis zum Cut-Punkt
|
|
- Alle offenen Strukturen werden geschlossen (}, ], ")
|
|
- Unvollständige Keys werden entfernt
|
|
- Kann direkt als valides JSON geparst werden
|
|
|
|
Uses module constants BUDGET_LIMIT and OVERLAP_MAX_CHARS.
|
|
|
|
Args:
|
|
truncatedJson: The truncated JSON string
|
|
|
|
Returns:
|
|
Tuple of (overlapContext, hierarchyContext, hierarchyContextForPrompt, completePart):
|
|
- overlapContext: The innermost object/element containing the cut (for merging)
|
|
- hierarchyContext: Full structure from root to cut WITHOUT budget limitations
|
|
- hierarchyContextForPrompt: Full structure from root to cut WITH budget limitations
|
|
- completePart: Valid JSON with all structures properly closed
|
|
|
|
Example:
|
|
>>> jsonStr = '{"users": [{"name": "John", "bio": "Hello Wor'
|
|
>>> overlap, hierarchy, hierarchyForPrompt, complete = extractContinuationContexts(jsonStr)
|
|
>>> import json
|
|
>>> parsed = json.loads(complete) # ✓ Funktioniert!
|
|
"""
|
|
return getJsonContinuationContext(truncatedJson)
|
|
|
|
|
|
# =============================================================================
|
|
# JSON REPAIR FUNCTIONS
|
|
# =============================================================================
|
|
|
|
def _repairInternalJsonErrors(jsonStr: str) -> str:
|
|
"""
|
|
Repair internal JSON errors WITHOUT touching incomplete structures at cut point.
|
|
|
|
This function fixes common internal JSON issues:
|
|
- Invalid escape sequences (e.g., \\x, \\u without proper hex)
|
|
- Unescaped control characters
|
|
- Invalid Unicode characters
|
|
- Trailing commas before closing brackets/braces
|
|
- Comments (// and /* */)
|
|
- Single quotes instead of double quotes (outside of string values)
|
|
- Unquoted keys
|
|
|
|
IMPORTANT: Does NOT modify incomplete structures at the end of the JSON.
|
|
Those are handled separately by structure closing logic.
|
|
|
|
Args:
|
|
jsonStr: JSON string that may have internal errors
|
|
|
|
Returns:
|
|
Repaired JSON string with internal errors fixed
|
|
"""
|
|
if not jsonStr or not jsonStr.strip():
|
|
return jsonStr
|
|
|
|
result = jsonStr
|
|
|
|
# Fix 1: Remove BOM and normalize whitespace at start
|
|
if result.startswith('\ufeff'):
|
|
result = result[1:]
|
|
|
|
# Fix 2: Normalize smart quotes to straight quotes
|
|
result = result.replace('"', '"').replace('"', '"')
|
|
result = result.replace(''', "'").replace(''', "'")
|
|
|
|
# Fix 3: Remove JavaScript-style comments (but be careful not to break strings)
|
|
result = _removeJsonComments(result)
|
|
|
|
# Fix 4: Fix invalid escape sequences
|
|
result = _fixInvalidEscapeSequences(result)
|
|
|
|
# Fix 5: Remove trailing commas before ] or }
|
|
result = _removeTrailingCommas(result)
|
|
|
|
# Fix 6: Fix unquoted keys (simple cases only)
|
|
result = _fixUnquotedKeys(result)
|
|
|
|
# Fix 7: Fix unescaped quotes inside string values
|
|
# This handles AI-generated JSON with quotes like: "text with "quoted" words"
|
|
result = _fixUnescapedQuotesInStrings(result)
|
|
|
|
# Fix 8: Fix unescaped control characters (ASCII 0-31)
|
|
result = _fixUnescapedControlCharacters(result)
|
|
|
|
return result
|
|
|
|
|
|
def _removeJsonComments(jsonStr: str) -> str:
|
|
"""Remove JavaScript-style comments from JSON, preserving strings."""
|
|
result = []
|
|
i = 0
|
|
inString = False
|
|
escaped = False
|
|
|
|
while i < len(jsonStr):
|
|
char = jsonStr[i]
|
|
|
|
if escaped:
|
|
result.append(char)
|
|
escaped = False
|
|
i += 1
|
|
continue
|
|
|
|
if char == '\\' and inString:
|
|
result.append(char)
|
|
escaped = True
|
|
i += 1
|
|
continue
|
|
|
|
if char == '"':
|
|
inString = not inString
|
|
result.append(char)
|
|
i += 1
|
|
continue
|
|
|
|
if inString:
|
|
result.append(char)
|
|
i += 1
|
|
continue
|
|
|
|
# Check for // comment
|
|
if char == '/' and i + 1 < len(jsonStr) and jsonStr[i + 1] == '/':
|
|
# Skip until end of line
|
|
while i < len(jsonStr) and jsonStr[i] != '\n':
|
|
i += 1
|
|
continue
|
|
|
|
# Check for /* */ comment
|
|
if char == '/' and i + 1 < len(jsonStr) and jsonStr[i + 1] == '*':
|
|
i += 2
|
|
while i + 1 < len(jsonStr):
|
|
if jsonStr[i] == '*' and jsonStr[i + 1] == '/':
|
|
i += 2
|
|
break
|
|
i += 1
|
|
continue
|
|
|
|
result.append(char)
|
|
i += 1
|
|
|
|
return ''.join(result)
|
|
|
|
|
|
def _fixInvalidEscapeSequences(jsonStr: str) -> str:
|
|
"""Fix invalid escape sequences in JSON strings."""
|
|
result = []
|
|
i = 0
|
|
inString = False
|
|
|
|
while i < len(jsonStr):
|
|
char = jsonStr[i]
|
|
|
|
if char == '"' and (i == 0 or jsonStr[i - 1] != '\\'):
|
|
inString = not inString
|
|
result.append(char)
|
|
i += 1
|
|
continue
|
|
|
|
if inString and char == '\\' and i + 1 < len(jsonStr):
|
|
nextChar = jsonStr[i + 1]
|
|
|
|
# Valid JSON escape sequences: \", \\, \/, \b, \f, \n, \r, \t, \uXXXX
|
|
validEscapes = ['"', '\\', '/', 'b', 'f', 'n', 'r', 't', 'u']
|
|
|
|
if nextChar in validEscapes:
|
|
if nextChar == 'u':
|
|
# Check if followed by 4 hex digits
|
|
if i + 5 < len(jsonStr) and all(c in '0123456789abcdefABCDEF' for c in jsonStr[i + 2:i + 6]):
|
|
result.append(char)
|
|
i += 1
|
|
continue
|
|
else:
|
|
# Invalid \u sequence - escape the backslash
|
|
result.append('\\')
|
|
result.append('\\')
|
|
i += 1
|
|
continue
|
|
else:
|
|
result.append(char)
|
|
i += 1
|
|
continue
|
|
else:
|
|
# Invalid escape - escape the backslash
|
|
result.append('\\')
|
|
result.append('\\')
|
|
i += 1
|
|
continue
|
|
|
|
result.append(char)
|
|
i += 1
|
|
|
|
return ''.join(result)
|
|
|
|
|
|
def _removeTrailingCommas(jsonStr: str) -> str:
|
|
"""Remove trailing commas before ] or } (not valid in JSON)."""
|
|
# Pattern: comma followed by whitespace and ] or }
|
|
result = re.sub(r',(\s*[}\]])', r'\1', jsonStr)
|
|
return result
|
|
|
|
|
|
def _fixUnquotedKeys(jsonStr: str) -> str:
|
|
"""
|
|
Fix simple unquoted keys in JSON objects.
|
|
Only handles simple cases to avoid breaking valid JSON.
|
|
"""
|
|
# Pattern: { or , followed by whitespace and an unquoted identifier and :
|
|
# Be conservative - only fix clear cases
|
|
|
|
result = []
|
|
i = 0
|
|
inString = False
|
|
escaped = False
|
|
|
|
while i < len(jsonStr):
|
|
char = jsonStr[i]
|
|
|
|
if escaped:
|
|
result.append(char)
|
|
escaped = False
|
|
i += 1
|
|
continue
|
|
|
|
if char == '\\' and inString:
|
|
result.append(char)
|
|
escaped = True
|
|
i += 1
|
|
continue
|
|
|
|
if char == '"':
|
|
inString = not inString
|
|
result.append(char)
|
|
i += 1
|
|
continue
|
|
|
|
if inString:
|
|
result.append(char)
|
|
i += 1
|
|
continue
|
|
|
|
# Check for unquoted key after { or ,
|
|
if char in '{,' and i + 1 < len(jsonStr):
|
|
result.append(char)
|
|
i += 1
|
|
|
|
# Skip whitespace
|
|
while i < len(jsonStr) and jsonStr[i] in ' \t\n\r':
|
|
result.append(jsonStr[i])
|
|
i += 1
|
|
|
|
if i >= len(jsonStr):
|
|
continue
|
|
|
|
# Check if next is an unquoted identifier (starts with letter or _)
|
|
if jsonStr[i] not in '"{[' and (jsonStr[i].isalpha() or jsonStr[i] == '_'):
|
|
# Collect the identifier
|
|
keyStart = i
|
|
while i < len(jsonStr) and (jsonStr[i].isalnum() or jsonStr[i] == '_'):
|
|
i += 1
|
|
key = jsonStr[keyStart:i]
|
|
|
|
# Skip whitespace
|
|
while i < len(jsonStr) and jsonStr[i] in ' \t\n\r':
|
|
i += 1
|
|
|
|
# Check if followed by :
|
|
if i < len(jsonStr) and jsonStr[i] == ':':
|
|
# This was an unquoted key - quote it
|
|
result.append('"')
|
|
result.append(key)
|
|
result.append('"')
|
|
else:
|
|
# Not a key, put back as-is
|
|
result.append(key)
|
|
continue
|
|
|
|
result.append(char)
|
|
i += 1
|
|
|
|
return ''.join(result)
|
|
|
|
|
|
def _fixUnescapedQuotesInStrings(jsonStr: str) -> str:
|
|
"""
|
|
Fix unescaped quotes inside JSON string values.
|
|
|
|
AI often generates JSON with unescaped quotes like:
|
|
"text with "quoted" words"
|
|
|
|
This should be:
|
|
"text with \"quoted\" words"
|
|
|
|
Strategy:
|
|
- Parse JSON structure to find string values
|
|
- Within a string, find unescaped quotes that are followed by content
|
|
that looks like it continues the string (not a : or , or } or ])
|
|
- Escape those quotes
|
|
"""
|
|
if not jsonStr or not jsonStr.strip():
|
|
return jsonStr
|
|
|
|
result = []
|
|
i = 0
|
|
inString = False
|
|
stringStart = -1
|
|
escaped = False
|
|
|
|
while i < len(jsonStr):
|
|
char = jsonStr[i]
|
|
|
|
if escaped:
|
|
result.append(char)
|
|
escaped = False
|
|
i += 1
|
|
continue
|
|
|
|
if char == '\\' and inString:
|
|
result.append(char)
|
|
escaped = True
|
|
i += 1
|
|
continue
|
|
|
|
if char == '"':
|
|
if not inString:
|
|
# Starting a string
|
|
inString = True
|
|
stringStart = i
|
|
result.append(char)
|
|
i += 1
|
|
continue
|
|
else:
|
|
# Could be end of string OR unescaped quote inside string
|
|
# Look ahead to determine
|
|
nextNonSpace = i + 1
|
|
while nextNonSpace < len(jsonStr) and jsonStr[nextNonSpace] in ' \t\n\r':
|
|
nextNonSpace += 1
|
|
|
|
if nextNonSpace < len(jsonStr):
|
|
nextChar = jsonStr[nextNonSpace]
|
|
|
|
# If next char is a structural character, this is end of string
|
|
if nextChar in ':,}]':
|
|
inString = False
|
|
result.append(char)
|
|
i += 1
|
|
continue
|
|
|
|
# If next char is a quote, might be end of string followed by another string
|
|
# Check if we're at a reasonable string end (has a colon or comma before next structure)
|
|
if nextChar == '"':
|
|
# This is end of string, start of next
|
|
inString = False
|
|
result.append(char)
|
|
i += 1
|
|
continue
|
|
|
|
# Otherwise, this quote is INSIDE the string - escape it!
|
|
result.append('\\')
|
|
result.append(char)
|
|
i += 1
|
|
continue
|
|
else:
|
|
# End of JSON - this must be closing quote
|
|
inString = False
|
|
result.append(char)
|
|
i += 1
|
|
continue
|
|
|
|
result.append(char)
|
|
i += 1
|
|
|
|
return ''.join(result)
|
|
|
|
|
|
def _fixUnescapedControlCharacters(jsonStr: str) -> str:
|
|
"""
|
|
Fix unescaped control characters in JSON strings.
|
|
|
|
JSON requires control characters (ASCII 0-31) to be escaped as \\uXXXX.
|
|
Common ones have shortcuts: \\n, \\r, \\t, \\b, \\f
|
|
|
|
This function finds unescaped control chars inside strings and escapes them.
|
|
"""
|
|
if not jsonStr or not jsonStr.strip():
|
|
return jsonStr
|
|
|
|
result = []
|
|
i = 0
|
|
inString = False
|
|
escaped = False
|
|
|
|
# Mapping of common control chars to their escape sequences
|
|
controlEscapes = {
|
|
'\n': '\\n',
|
|
'\r': '\\r',
|
|
'\t': '\\t',
|
|
'\b': '\\b',
|
|
'\f': '\\f',
|
|
}
|
|
|
|
while i < len(jsonStr):
|
|
char = jsonStr[i]
|
|
|
|
if escaped:
|
|
result.append(char)
|
|
escaped = False
|
|
i += 1
|
|
continue
|
|
|
|
if char == '\\' and inString:
|
|
result.append(char)
|
|
escaped = True
|
|
i += 1
|
|
continue
|
|
|
|
if char == '"':
|
|
inString = not inString
|
|
result.append(char)
|
|
i += 1
|
|
continue
|
|
|
|
if inString:
|
|
# Check for control characters (ASCII 0-31)
|
|
if ord(char) < 32:
|
|
if char in controlEscapes:
|
|
result.append(controlEscapes[char])
|
|
else:
|
|
# Use \uXXXX format for other control chars
|
|
result.append(f'\\u{ord(char):04x}')
|
|
i += 1
|
|
continue
|
|
|
|
result.append(char)
|
|
i += 1
|
|
|
|
return ''.join(result)
|
|
|
|
|
|
def _tryParseJson(jsonStr: str) -> tuple:
|
|
"""
|
|
Try to parse JSON string and return (parsed, error).
|
|
|
|
Returns:
|
|
Tuple of (parsed_object, error_string)
|
|
- If successful: (parsed_object, None)
|
|
- If failed: (None, error_message)
|
|
"""
|
|
if not jsonStr or not jsonStr.strip():
|
|
return None, "Empty JSON string"
|
|
|
|
try:
|
|
parsed = json.loads(jsonStr)
|
|
return parsed, None
|
|
except json.JSONDecodeError as e:
|
|
return None, str(e)
|
|
except Exception as e:
|
|
return None, str(e)
|
|
|
|
|
|
# Convenience function with named results
|
|
def getContexts(
|
|
truncatedJson: str
|
|
) -> JsonContinuationContexts:
|
|
"""
|
|
Get all contexts as a Pydantic model with named fields.
|
|
|
|
Uses module constants BUDGET_LIMIT and OVERLAP_MAX_CHARS.
|
|
|
|
This function:
|
|
1. Extracts continuation contexts (overlap, hierarchy, completePart)
|
|
2. Tries to parse completePart as JSON
|
|
3. If parsing fails, repairs internal errors and retries
|
|
4. Sets jsonParsingSuccess to indicate if completePart is valid JSON
|
|
5. Sets overlapContext="" if JSON is complete (no cut point)
|
|
|
|
IMPORTANT: overlapContext="" signals that JSON is complete (no more data expected).
|
|
This happens when the original JSON is already valid (no structures needed closing).
|
|
|
|
Args:
|
|
truncatedJson: The truncated JSON string
|
|
|
|
Returns:
|
|
JsonContinuationContexts Pydantic model with:
|
|
- overlapContext: The innermost object/element containing the cut
|
|
Empty string "" if JSON is complete (no cut point)
|
|
- hierarchyContext: Full structure WITHOUT budget limitations (for internal use)
|
|
- hierarchyContextForPrompt: Full structure WITH budget limitations (for prompts)
|
|
- completePart: Valid JSON with all structures properly closed
|
|
- jsonParsingSuccess: True if completePart is valid parseable JSON
|
|
|
|
Example:
|
|
>>> json_str = '{"users": [{"name": "John", "bio": "Hello Wor'
|
|
>>> contexts = getContexts(json_str)
|
|
>>> print(contexts.overlapContext) # Contains cut point context
|
|
>>> print(contexts.jsonParsingSuccess)
|
|
|
|
>>> complete_json = '{"users": [{"name": "John"}]}'
|
|
>>> contexts = getContexts(complete_json)
|
|
>>> print(contexts.overlapContext) # "" (empty - JSON is complete)
|
|
>>> print(contexts.jsonParsingSuccess) # True
|
|
"""
|
|
# First, check if original JSON is already complete (parseable without modification)
|
|
jsonIsComplete = False
|
|
if truncatedJson and truncatedJson.strip():
|
|
parsed, error = _tryParseJson(truncatedJson.strip())
|
|
if error is None:
|
|
jsonIsComplete = True
|
|
logger.debug("Original JSON is already complete (no cut point)")
|
|
|
|
# Extract contexts
|
|
overlap, hierarchy, hierarchyForPrompt, completePart = extractContinuationContexts(truncatedJson)
|
|
|
|
# If JSON is complete (no cut point), set overlapContext to empty string
|
|
# This signals that no more continuation is needed
|
|
if jsonIsComplete:
|
|
overlap = ""
|
|
logger.debug("Setting overlapContext='' (JSON is complete)")
|
|
|
|
# Try to parse completePart as JSON
|
|
jsonParsingSuccess = False
|
|
|
|
if completePart and completePart.strip():
|
|
# First attempt: parse as-is
|
|
parsed, error = _tryParseJson(completePart)
|
|
|
|
if error is None:
|
|
jsonParsingSuccess = True
|
|
else:
|
|
# Second attempt: repair internal errors and retry
|
|
logger.debug(f"Initial parse failed: {error}, attempting repair")
|
|
repairedCompletePart = _repairInternalJsonErrors(completePart)
|
|
|
|
parsed, error = _tryParseJson(repairedCompletePart)
|
|
|
|
if error is None:
|
|
# Repair succeeded - use repaired version
|
|
completePart = repairedCompletePart
|
|
jsonParsingSuccess = True
|
|
logger.debug("JSON repair successful")
|
|
else:
|
|
# Repair also failed - keep original completePart, mark as failed
|
|
logger.debug(f"JSON repair also failed: {error}")
|
|
jsonParsingSuccess = False
|
|
|
|
return JsonContinuationContexts(
|
|
overlapContext=overlap,
|
|
hierarchyContext=hierarchy,
|
|
hierarchyContextForPrompt=hierarchyForPrompt,
|
|
completePart=completePart,
|
|
jsonParsingSuccess=jsonParsingSuccess
|
|
)
|