gateway/modules/shared/jsonContinuation.py
2026-04-10 12:33:27 +02:00

2224 lines
80 KiB
Python

"""
JSON Continuation Context Module
Generiert drei Kontexte für abgeschnittene JSON-Strings:
1. Overlap Context: Das innerste Objekt/Array-Element, das den Cut-Punkt enthält
2. Hierarchy Context: Die hierarchische Struktur vom Root bis zum Cut mit Budget-Logik
3. Complete Part: Der vollständige Teil des JSONs mit allen Strukturen geschlossen
Hauptfunktionen:
- extractContinuationContexts(truncatedJson: str) -> Tuple[str, str, str]
Extrahiert alle drei Kontexte aus einem abgeschnittenen JSON-String.
- getContexts(truncatedJson: str) -> JsonContinuationContexts
Gibt alle Kontexte als Pydantic-Modell zurück mit benannten Feldern.
Modulkonstanten:
- BUDGET_LIMIT: int = 500
Zeichen-Budget für vollständige Datenwerte im Hierarchy Context
- OVERLAP_MAX_CHARS: int = 1000
Maximale Zeichen für den Overlap Context
Verwendung:
>>> from modules.shared.jsonContinuation import getContexts
>>> jsonStr = '{"users": [{"name": "John", "bio": "Hello Wor'
>>> contexts = getContexts(jsonStr)
>>> print(contexts.overlapContext)
>>> print(contexts.hierarchyContext)
>>> print(contexts.completePart)
Autor: Claude
Version: 2.0
"""
import json
import logging
import re
from typing import Tuple, List, Optional, Any, Set
from dataclasses import dataclass, field
from enum import Enum
from modules.datamodels.datamodelAi import JsonContinuationContexts
logger = logging.getLogger(__name__)
# =============================================================================
# MODULE CONSTANTS
# =============================================================================
BUDGET_LIMIT: int = 2000
"""Zeichen-Budget für vollständige Datenwerte im Hierarchy Context"""
OVERLAP_MAX_CHARS: int = 1000
"""Maximale Zeichen für den Overlap Context"""
# =============================================================================
# TOKEN TYPES AND DATA CLASSES
# =============================================================================
class JsonTokenType(Enum):
"""JSON Token Types"""
OBJECT_START = "{"
OBJECT_END = "}"
ARRAY_START = "["
ARRAY_END = "]"
STRING = "string"
NUMBER = "number"
BOOLEAN = "boolean"
NULL = "null"
COLON = ":"
COMMA = ","
KEY = "key"
EOF = "eof"
TRUNCATED = "truncated"
@dataclass
class JsonToken:
"""Represents a JSON token with position info"""
type: JsonTokenType
value: Any
start_pos: int
end_pos: int
raw: str # Original string representation
@dataclass
class StackFrame:
"""Represents a level in the JSON hierarchy"""
type: str # "object" or "array"
start_pos: int
key: Optional[str] = None # Current key for objects
index: int = 0 # Current index for arrays
content: str = "" # Accumulated content for this frame
keys_seen: List[str] = None # Keys seen in this object
def __post_init__(self):
if self.keys_seen is None:
self.keys_seen = []
class JsonTokenizer:
"""Tokenizer for potentially truncated JSON strings"""
def __init__(self, jsonStr: str):
self.jsonStr = jsonStr
self.pos = 0
self.length = len(jsonStr)
def skipWhitespace(self):
"""Skip whitespace characters"""
while self.pos < self.length and self.jsonStr[self.pos] in ' \t\n\r':
self.pos += 1
def peek(self) -> Optional[str]:
"""Peek at current character without consuming"""
if self.pos < self.length:
return self.jsonStr[self.pos]
return None
def readString(self) -> JsonToken:
"""Read a JSON string token"""
start_pos = self.pos
self.pos += 1 # Skip opening quote
escaped = False
while self.pos < self.length:
char = self.jsonStr[self.pos]
if escaped:
escaped = False
self.pos += 1
elif char == '\\':
escaped = True
self.pos += 1
elif char == '"':
self.pos += 1
raw = self.jsonStr[start_pos:self.pos]
try:
# Try to decode the string value
value = raw[1:-1] # Remove quotes for value
except:
value = raw
return JsonToken(JsonTokenType.STRING, value, start_pos, self.pos, raw)
else:
self.pos += 1
# String was truncated
raw = self.jsonStr[start_pos:self.pos]
return JsonToken(JsonTokenType.TRUNCATED, raw[1:] if len(raw) > 1 else "", start_pos, self.pos, raw)
def readNumber(self) -> JsonToken:
"""Read a JSON number token"""
start_pos = self.pos
# Handle negative
if self.pos < self.length and self.jsonStr[self.pos] == '-':
self.pos += 1
# Read digits
while self.pos < self.length and self.jsonStr[self.pos].isdigit():
self.pos += 1
# Decimal part
if self.pos < self.length and self.jsonStr[self.pos] == '.':
self.pos += 1
while self.pos < self.length and self.jsonStr[self.pos].isdigit():
self.pos += 1
# Exponent
if self.pos < self.length and self.jsonStr[self.pos] in 'eE':
self.pos += 1
if self.pos < self.length and self.jsonStr[self.pos] in '+-':
self.pos += 1
while self.pos < self.length and self.jsonStr[self.pos].isdigit():
self.pos += 1
raw = self.jsonStr[start_pos:self.pos]
try:
value = float(raw) if '.' in raw or 'e' in raw.lower() else int(raw)
except ValueError:
value = raw
return JsonToken(JsonTokenType.NUMBER, value, start_pos, self.pos, raw)
def readKeyword(self) -> JsonToken:
"""Read true, false, or null"""
start_pos = self.pos
for keyword, token_type in [('true', JsonTokenType.BOOLEAN),
('false', JsonTokenType.BOOLEAN),
('null', JsonTokenType.NULL)]:
if self.jsonStr[self.pos:].startswith(keyword):
self.pos += len(keyword)
value = True if keyword == 'true' else (False if keyword == 'false' else None)
return JsonToken(token_type, value, start_pos, self.pos, keyword)
# Partial keyword (truncated)
while self.pos < self.length and self.jsonStr[self.pos].isalpha():
self.pos += 1
raw = self.jsonStr[start_pos:self.pos]
return JsonToken(JsonTokenType.TRUNCATED, raw, start_pos, self.pos, raw)
def nextJsonToken(self) -> JsonToken:
"""Get the next token"""
self.skipWhitespace()
if self.pos >= self.length:
return JsonToken(JsonTokenType.EOF, None, self.pos, self.pos, "")
char = self.jsonStr[self.pos]
startPos = self.pos
if char == '{':
self.pos += 1
return JsonToken(JsonTokenType.OBJECT_START, '{', startPos, self.pos, '{')
elif char == '}':
self.pos += 1
return JsonToken(JsonTokenType.OBJECT_END, '}', startPos, self.pos, '}')
elif char == '[':
self.pos += 1
return JsonToken(JsonTokenType.ARRAY_START, '[', startPos, self.pos, '[')
elif char == ']':
self.pos += 1
return JsonToken(JsonTokenType.ARRAY_END, ']', startPos, self.pos, ']')
elif char == ':':
self.pos += 1
return JsonToken(JsonTokenType.COLON, ':', startPos, self.pos, ':')
elif char == ',':
self.pos += 1
return JsonToken(JsonTokenType.COMMA, ',', startPos, self.pos, ',')
elif char == '"':
return self.readString()
elif char == '-' or char.isdigit():
return self.readNumber()
elif char.isalpha():
return self.readKeyword()
else:
# Unknown character, treat as truncated
self.pos += 1
return JsonToken(JsonTokenType.TRUNCATED, char, startPos, self.pos, char)
@dataclass
class HierarchyLevel:
"""Represents one level in the parsed hierarchy"""
type: str # "object" or "array"
start_pos: int
end_pos: int # -1 if not closed
key: Optional[str] # Key if this is a value in an object
index: Optional[int] # Index if this is in an array
content: dict # Parsed content at this level
raw_start: str # Raw string from start to children
children_content: List[Any] # For arrays: list of parsed elements
def getJsonContinuationContext(
truncatedJson: str,
budgetLimit: Optional[int] = None,
overlapMaxChars: Optional[int] = None
) -> Tuple[str, str, str, str]:
"""
Generate continuation contexts for a truncated JSON string.
Generiert vier Kontexte für abgeschnittene JSON-Strings:
1. Overlap Context: Das innerste Objekt/Array-Element, das den Cut-Punkt enthält
2. Hierarchy Context: Die hierarchische Struktur vom Root bis zum Cut OHNE Budget-Limits (für interne Nutzung)
3. Hierarchy Context For Prompt: Die hierarchische Struktur vom Root bis zum Cut MIT Budget-Limits (für Prompts)
4. Complete Part: Der vollständige Teil des JSONs mit allen Strukturen geschlossen
Args:
truncatedJson: The truncated JSON string
budgetLimit: Character budget for data values in hierarchy context (uses BUDGET_LIMIT if None)
overlapMaxChars: Maximum characters for overlap context (uses OVERLAP_MAX_CHARS if None)
Returns:
Tuple of (overlapContext, hierarchyContext, hierarchyContextForPrompt, completePart):
- overlapContext: The innermost object/element containing the cut (for merging)
- hierarchyContext: Full structure from root to cut WITHOUT budget limitations (for internal use)
- hierarchyContextForPrompt: Full structure from root to cut WITH budget limitations (for prompts)
- completePart: Valid JSON with all structures properly closed
"""
if budgetLimit is None:
budgetLimit = BUDGET_LIMIT
if overlapMaxChars is None:
overlapMaxChars = OVERLAP_MAX_CHARS
analyzer = JsonAnalyzer(truncatedJson, budgetLimit, overlapMaxChars)
return analyzer.analyze()
@dataclass
class BudgetAllocation:
"""Tracks which nodes have been allocated budget"""
allocated_node_ids: Set[int] = field(default_factory=set)
path_node_ids: Set[int] = field(default_factory=set)
summary_mode: bool = False
class JsonAnalyzer:
"""
Analyzes truncated JSON and generates continuation contexts.
Generates three contexts for truncated JSON strings:
1. Overlap Context: The innermost object/array element containing the cut point
2. Hierarchy Context: The hierarchical structure from root to cut with budget logic
3. Complete Part: The complete part of the JSON with all structures properly closed
"""
def __init__(self, jsonStr: str, budgetLimit: Optional[int] = None, overlapMaxChars: Optional[int] = None):
self.jsonStr = jsonStr
self.budgetLimit = budgetLimit if budgetLimit is not None else BUDGET_LIMIT
self.overlapMaxChars = overlapMaxChars if overlapMaxChars is not None else OVERLAP_MAX_CHARS
self.stack: List[StackFrame] = []
self.hierarchy: List[dict] = [] # Parsed hierarchy info
def analyze(self) -> Tuple[str, str, str]:
"""
Analyze the truncated JSON and return all three contexts.
Returns:
Tuple of (overlapContext, hierarchyContext, completePart)
"""
# Parse and track the structure
self._parseStructure()
# Generate overlap context
overlapContext = self._generateOverlapContext()
# Parse structure for hierarchy (needed for both contexts)
structure = self._parseForHierarchy()
cutPos = len(self.jsonStr)
# Build both hierarchy contexts from the SAME structure BEFORE generating complete part
# CRITICAL: hierarchyContext must be the EXACT original JSON (for merge overlap detection!)
# The rendered version would have different formatting, breaking overlap matching
hierarchyContext = self.jsonStr
# Generate hierarchy context WITH budget (for prompts) - uses same structure
hierarchyContextForPrompt = self._renderWithBudgetFromStructure(structure, cutPos)
# Generate complete part (JSON with all structures closed)
completePart = self._generateCompletePart()
return overlapContext, hierarchyContext, hierarchyContextForPrompt, completePart
def _generateCompletePart(self) -> str:
"""
Generate the complete part of the JSON with all structures properly closed.
This creates valid JSON by closing all open strings, brackets/braces.
Unvollständige Keys werden entfernt, damit das Ergebnis valides JSON ist.
Unvollständige Keywords (true, false, null) werden vervollständigt.
Strategy:
1. Take the full truncated JSON
2. If we're in the middle of a string, close it
3. Complete incomplete keywords (tr → true, f → false, n → null)
4. Remove incomplete key-value pairs (keys without values)
5. Close all open brackets/braces
"""
result = self.jsonStr.rstrip()
# Remove trailing comma if present (after stripping)
if result.endswith(','):
result = result[:-1]
# Check if we need to close an open string
stringClosing = self._getStringClosing(result)
result += stringClosing
# Complete incomplete keywords (true, false, null)
result = self._completeIncompleteKeywords(result)
# Check if we're in the middle of a key (after colon)
# If string was just closed and we're after a colon with no value, remove the key
result = self._cleanIncompleteKeyValue(result)
# Close all open structures
closingBrackets = self._getClosingBrackets(result)
return result + closingBrackets
def _getStringClosing(self, jsonStr: str) -> str:
"""Check if there's an unclosed string and return closing quote if needed."""
in_string = False
escaped = False
for char in jsonStr:
if escaped:
escaped = False
continue
if char == '\\' and in_string:
escaped = True
continue
if char == '"':
in_string = not in_string
return '"' if in_string else ""
def _cleanIncompleteKeyValue(self, jsonStr: str) -> str:
"""
Clean up incomplete key-value pairs.
Handles cases like:
- {"key": "incompl -> keep (valid truncated value)
- {"key": -> remove key
- {"a": 1, "key -> remove incomplete key (was in middle of key name)
"""
stripped = jsonStr.rstrip()
# Pattern: ends with colon (possibly with whitespace) - incomplete value
if stripped.endswith(':'):
# Find the start of this key and remove the whole key-value
return self._removeLastKey(stripped)
# Check if we just closed a string that was an incomplete key
# Pattern: ..., "something" or { "something" where something has no colon after
# This happens when we close a truncated key name like "add" -> "add"
if stripped.endswith('"'):
# Look for the pattern: comma/bracket + whitespace + "string"
# and check if this was supposed to be a key
if self._isIncompleteKey(stripped):
return self._removeLastKey(stripped)
return jsonStr
def _completeIncompleteKeywords(self, jsonStr: str) -> str:
"""
Complete incomplete JSON keywords at the end of the string.
Checks the last element for incomplete keywords after colon:
- ": t*" or ": f*" or ": n*" -> complete to true/false/null
- ": " or ":" (without keyword) -> set to null
"""
result = jsonStr.rstrip()
# Find the last colon (not in string)
in_string = False
escaped = False
last_colon_pos = -1
for i in range(len(result) - 1, -1, -1):
char = result[i]
if escaped:
escaped = False
continue
if char == '\\' and in_string:
escaped = True
continue
if char == '"':
in_string = not in_string
continue
if not in_string and char == ':':
last_colon_pos = i
break
if last_colon_pos < 0:
return result
# Get text after the last colon
after_colon = result[last_colon_pos + 1:].strip()
# Check for incomplete keyword patterns
if after_colon.startswith('t') or after_colon.startswith('T'):
# Incomplete true
keyword_start = last_colon_pos + 1
# Skip whitespace
while keyword_start < len(result) and result[keyword_start] in ' \t\n\r':
keyword_start += 1
# Remove partial keyword
keyword_end = keyword_start + 1
while keyword_end < len(result) and result[keyword_end].isalpha():
keyword_end += 1
return result[:keyword_start] + 'true' + result[keyword_end:]
elif after_colon.startswith('f') or after_colon.startswith('F'):
# Incomplete false
keyword_start = last_colon_pos + 1
while keyword_start < len(result) and result[keyword_start] in ' \t\n\r':
keyword_start += 1
keyword_end = keyword_start + 1
while keyword_end < len(result) and result[keyword_end].isalpha():
keyword_end += 1
return result[:keyword_start] + 'false' + result[keyword_end:]
elif after_colon.startswith('n') or after_colon.startswith('N'):
# Incomplete null
keyword_start = last_colon_pos + 1
while keyword_start < len(result) and result[keyword_start] in ' \t\n\r':
keyword_start += 1
keyword_end = keyword_start + 1
while keyword_end < len(result) and result[keyword_end].isalpha():
keyword_end += 1
return result[:keyword_start] + 'null' + result[keyword_end:]
elif not after_colon or after_colon == '':
# No keyword after colon -> set to null
return result + 'null'
return result
def _isIncompleteKey(self, jsonStr: str) -> bool:
"""
Check if the last string in the JSON is an incomplete key in an object.
This happens when truncation occurred in the middle of a key name.
Only applies to objects, not arrays.
"""
# Find the last complete string
pos = len(jsonStr) - 1
if jsonStr[pos] != '"':
return False
# Find the opening quote of this string
stringStart = pos - 1
while stringStart >= 0:
if jsonStr[stringStart] == '"':
# Check it's not escaped
numBackslashes = 0
checkPos = stringStart - 1
while checkPos >= 0 and jsonStr[checkPos] == '\\':
numBackslashes += 1
checkPos -= 1
if numBackslashes % 2 == 0:
break
stringStart -= 1
if stringStart < 0:
return False
# Now stringStart points to opening quote
# Check what's before it (skip whitespace)
beforePos = stringStart - 1
while beforePos >= 0 and jsonStr[beforePos] in ' \t\n\r':
beforePos -= 1
if beforePos < 0:
return False
# For this to be an incomplete key, it must be preceded by { or ,
# AND we must be inside an object (not an array)
if jsonStr[beforePos] not in ',{':
return False
# Now check if we're in an object context (not array)
# Count open braces/brackets to determine context
braceCount = 0
bracketCount = 0
inString = False
for i in range(beforePos + 1):
char = jsonStr[i]
if char == '"' and (i == 0 or jsonStr[i-1] != '\\'):
inString = not inString
elif not inString:
if char == '{':
braceCount += 1
elif char == '}':
braceCount -= 1
elif char == '[':
bracketCount += 1
elif char == ']':
bracketCount -= 1
# If we have more open braces than brackets at this point,
# we're in an object context
# Actually, we need to check the innermost container
# Let's track the stack properly
stack = []
inString = False
for i in range(beforePos + 1):
char = jsonStr[i]
if char == '"' and (i == 0 or jsonStr[i-1] != '\\'):
inString = not inString
elif not inString:
if char == '{':
stack.append('object')
elif char == '[':
stack.append('array')
elif char == '}':
if stack and stack[-1] == 'object':
stack.pop()
elif char == ']':
if stack and stack[-1] == 'array':
stack.pop()
# If innermost container is an object, this is an incomplete key
return len(stack) > 0 and stack[-1] == 'object'
def _removeLastKey(self, jsonStr: str) -> str:
"""Remove the last incomplete key-value pair from the JSON string."""
stripped = jsonStr.rstrip()
# Find the last comma or opening bracket before the incomplete key
pos = len(stripped) - 1
# Skip past the current string/key
in_string = False
while pos >= 0:
char = stripped[pos]
if char == '"' and (pos == 0 or stripped[pos-1] != '\\'):
in_string = not in_string
if not in_string and char in ',{':
break
pos -= 1
if pos < 0:
return stripped
if stripped[pos] == ',':
# Remove from comma onwards
return stripped[:pos]
elif stripped[pos] == '{':
# Keep the opening brace
return stripped[:pos+1]
return stripped
def _findLastCompletePosition(self) -> int:
"""Find the position of the last complete value in the JSON."""
tokenizer = JsonTokenizer(self.jsonStr)
last_complete_pos = 0
stack_depth = 0
last_value_end = 0
in_value = False
while True:
token = tokenizer.nextJsonToken()
if token.type == JsonTokenType.EOF:
break
if token.type == JsonTokenType.TRUNCATED:
# Return position before the truncated part
break
if token.type in (JsonTokenType.OBJECT_START, JsonTokenType.ARRAY_START):
stack_depth += 1
in_value = True
elif token.type in (JsonTokenType.OBJECT_END, JsonTokenType.ARRAY_END):
stack_depth -= 1
last_value_end = token.end_pos
in_value = False
elif token.type == JsonTokenType.STRING:
# Check if this is a key or a value
saved_pos = tokenizer.pos
tokenizer.skipWhitespace()
next_char = tokenizer.peek()
tokenizer.pos = saved_pos
if next_char != ':':
# It's a value
last_value_end = token.end_pos
in_value = False
elif token.type in (JsonTokenType.NUMBER, JsonTokenType.BOOLEAN, JsonTokenType.NULL):
last_value_end = token.end_pos
in_value = False
elif token.type == JsonTokenType.COMMA:
# After a comma, we've completed a value
last_complete_pos = last_value_end
# Return the last complete position
return last_value_end if last_value_end > 0 else len(self.jsonStr)
def _getClosingBrackets(self, jsonStr: str) -> str:
"""Determine what closing brackets are needed."""
stack = []
in_string = False
escaped = False
for char in jsonStr:
if escaped:
escaped = False
continue
if char == '\\' and in_string:
escaped = True
continue
if char == '"':
in_string = not in_string
continue
if in_string:
continue
if char == '{':
stack.append('}')
elif char == '[':
stack.append(']')
elif char == '}':
if stack and stack[-1] == '}':
stack.pop()
elif char == ']':
if stack and stack[-1] == ']':
stack.pop()
# Return closing brackets in reverse order
return ''.join(reversed(stack))
def _parseStructure(self):
"""Parse the JSON structure and track hierarchy"""
tokenizer = JsonTokenizer(self.jsonStr)
while True:
token = tokenizer.nextJsonToken()
if token.type == JsonTokenType.EOF or token.type == JsonTokenType.TRUNCATED:
break
if token.type == JsonTokenType.OBJECT_START:
frame = StackFrame(
type="object",
start_pos=token.start_pos,
keys_seen=[]
)
self.stack.append(frame)
elif token.type == JsonTokenType.ARRAY_START:
frame = StackFrame(
type="array",
start_pos=token.start_pos,
index=0
)
self.stack.append(frame)
elif token.type == JsonTokenType.OBJECT_END:
if self.stack and self.stack[-1].type == "object":
self.stack.pop()
elif token.type == JsonTokenType.ARRAY_END:
if self.stack and self.stack[-1].type == "array":
self.stack.pop()
elif token.type == JsonTokenType.STRING:
# Could be a key or a value
self._handleStringJsonToken(token, tokenizer)
elif token.type == JsonTokenType.COMMA:
# Increment array index
if self.stack and self.stack[-1].type == "array":
self.stack[-1].index += 1
def _handleStringJsonToken(self, token: JsonToken, tokenizer: JsonTokenizer):
"""Handle a string token (could be key or value)"""
if self.stack and self.stack[-1].type == "object":
# Check if this is a key (followed by colon)
saved_pos = tokenizer.pos
tokenizer.skipWhitespace()
next_char = tokenizer.peek()
if next_char == ':':
# This is a key
self.stack[-1].key = token.value
self.stack[-1].keys_seen.append(token.value)
tokenizer.pos = saved_pos
def _generateOverlapContext(self) -> str:
"""
Generate the overlap context - the innermost object/array element containing the cut.
Returns the raw string from the start of that element to the end of the truncated JSON.
Dieser Kontext wird verwendet, um den abgeschnittenen Teil mit dem neuen Teil zu mergen.
Exakt so wie im Original-String (für String-Matching beim Merge).
SPECIAL CASE: If cut point is within a list element, return the entire list object (from opening bracket).
"""
if not self.stack:
# No structure, return last overlap_max_chars characters
return self.jsonStr[-self.overlapMaxChars:]
# Find the innermost container that should be the overlap
innermost = self.stack[-1]
# SPECIAL CASE: If innermost is an array, return the entire array (from opening bracket)
if innermost.type == "array":
overlap_start = innermost.start_pos
else:
# For objects, use the standard logic
overlap_start = self._findInnermostElementStart()
overlap = self.jsonStr[overlap_start:]
# Apply max chars limit
if len(overlap) > self.overlapMaxChars:
overlap = self.jsonStr[-self.overlapMaxChars:]
return overlap
def _findAllArrayElementStarts(self, arrayFrame: StackFrame) -> List[int]:
"""Find all element start positions in an array"""
arrayContent = self.jsonStr[arrayFrame.start_pos:]
# Skip the opening bracket and whitespace
pos = 1
while pos < len(arrayContent) and arrayContent[pos] in ' \t\n\r':
pos += 1
elementStarts = [arrayFrame.start_pos + pos]
depth = 0
inString = False
escaped = False
i = pos
while i < len(arrayContent):
char = arrayContent[i]
if escaped:
escaped = False
i += 1
continue
if char == '\\' and inString:
escaped = True
i += 1
continue
if char == '"':
inString = not inString
i += 1
continue
if inString:
i += 1
continue
if char in '{[':
depth += 1
elif char in '}]':
depth -= 1
elif char == ',' and depth == 0:
# Found element boundary
i += 1
# Skip whitespace
while i < len(arrayContent) and arrayContent[i] in ' \t\n\r':
i += 1
elementStarts.append(arrayFrame.start_pos + i)
i += 1
return elementStarts
def _findInnermostElementStart(self) -> int:
"""Find the start position of the innermost element for overlap"""
if not self.stack:
return max(0, len(self.jsonStr) - self.overlapMaxChars)
# Walk through stack to find the innermost array element or object
# We want the innermost "atomic" unit that contains the cut
# Strategy:
# - If innermost is an object: return its start
# - If innermost is an array:
# - If current element is an object/array: return start of that element
# - If current element is a primitive: return start of array or last N chars
innermost = self.stack[-1]
if innermost.type == "object":
return innermost.start_pos
else:
# It's an array - find the start of the current element
element_start = self._findArrayElementStart(innermost)
# Check if the element is a primitive or complex type
element_content = self.jsonStr[element_start:].strip()
# If it starts with { or [ it's complex, return the element start
if element_content and element_content[0] in '{[':
return element_start
else:
# Primitive in array - check if there's a parent object
# or return overlap_max_chars from end
for i in range(len(self.stack) - 2, -1, -1):
if self.stack[i].type == "object":
return self.stack[i].start_pos
# No parent object, return max chars from end
return max(0, len(self.jsonStr) - self.overlapMaxChars)
def _findArrayElementStart(self, arrayFrame: StackFrame) -> int:
"""Find the start position of the current array element"""
# We need to find the start of the current element in the array
# Parse from array start to find element boundaries
arrayContent = self.jsonStr[arrayFrame.start_pos:]
# Skip the opening bracket and whitespace
pos = 1
while pos < len(arrayContent) and arrayContent[pos] in ' \t\n\r':
pos += 1
elementStarts = [arrayFrame.start_pos + pos]
depth = 0
inString = False
escaped = False
i = pos
while i < len(arrayContent):
char = arrayContent[i]
if escaped:
escaped = False
i += 1
continue
if char == '\\' and inString:
escaped = True
i += 1
continue
if char == '"':
inString = not inString
i += 1
continue
if inString:
i += 1
continue
if char in '{[':
depth += 1
elif char in '}]':
depth -= 1
elif char == ',' and depth == 0:
# Found element boundary
i += 1
# Skip whitespace
while i < len(arrayContent) and arrayContent[i] in ' \t\n\r':
i += 1
elementStarts.append(arrayFrame.start_pos + i)
i += 1
# Return the start of the current element
if arrayFrame.index < len(elementStarts):
return elementStarts[arrayFrame.index]
elif elementStarts:
return elementStarts[-1]
else:
return arrayFrame.start_pos
def _generateHierarchyContext(self) -> str:
"""
Generate the hierarchy context with budget logic.
Shows structure from root to cut point with data values limited by budget.
"""
if not self.stack:
# No structure
return self.jsonStr[-self.overlapMaxChars:]
# We need to rebuild the JSON with budget logic
# Priority: elements closer to cut get full values, distant ones get "..."
return self._rebuildWithBudget()
def _rebuildWithBudget(self) -> str:
"""Rebuild JSON from root to cut with budget constraints"""
# Strategy:
# 1. Parse the JSON structure tracking all values
# 2. Calculate total value size
# 3. Apply budget from cut backwards
# 4. Render with "..." for values outside budget
# First, get a structured representation
structure = self._parseForHierarchy()
# Now render with budget
return self._renderWithBudget(structure)
def _parseForHierarchy(self) -> dict:
"""Parse JSON into a structure suitable for hierarchy rendering"""
result = {
'type': 'root',
'children': [],
'raw_positions': []
}
tokenizer = JsonTokenizer(self.jsonStr)
stack = [result]
current_key = None
while True:
token = tokenizer.nextJsonToken()
if token.type == JsonTokenType.EOF:
break
if token.type == JsonTokenType.TRUNCATED:
# Mark the truncation point
if stack:
current = stack[-1]
if current.get('type') == 'object':
if current_key:
current['children'].append({
'type': 'truncated_value',
'key': current_key,
'raw': self.jsonStr[token.start_pos:],
'start_pos': token.start_pos
})
elif current.get('type') == 'array':
current['children'].append({
'type': 'truncated_value',
'raw': self.jsonStr[token.start_pos:],
'start_pos': token.start_pos
})
break
if token.type == JsonTokenType.OBJECT_START:
obj = {
'type': 'object',
'key': current_key,
'children': [],
'start_pos': token.start_pos
}
if stack:
stack[-1]['children'].append(obj)
stack.append(obj)
current_key = None
elif token.type == JsonTokenType.ARRAY_START:
arr = {
'type': 'array',
'key': current_key,
'children': [],
'start_pos': token.start_pos
}
if stack:
stack[-1]['children'].append(arr)
stack.append(arr)
current_key = None
elif token.type == JsonTokenType.OBJECT_END:
if len(stack) > 1 and stack[-1].get('type') == 'object':
stack[-1]['end_pos'] = token.end_pos
stack[-1]['complete'] = True
stack.pop()
elif token.type == JsonTokenType.ARRAY_END:
if len(stack) > 1 and stack[-1].get('type') == 'array':
stack[-1]['end_pos'] = token.end_pos
stack[-1]['complete'] = True
stack.pop()
elif token.type == JsonTokenType.STRING:
# Check if it's a key
saved_pos = tokenizer.pos
tokenizer.skipWhitespace()
next_char = tokenizer.peek()
if next_char == ':' and stack and stack[-1].get('type') == 'object':
current_key = token.value
else:
# It's a value
value_node = {
'type': 'value',
'key': current_key,
'value': token.value,
'raw': token.raw,
'start_pos': token.start_pos,
'end_pos': token.end_pos,
'value_type': 'string'
}
if stack:
stack[-1]['children'].append(value_node)
current_key = None
tokenizer.pos = saved_pos
elif token.type in (JsonTokenType.NUMBER, JsonTokenType.BOOLEAN, JsonTokenType.NULL):
value_node = {
'type': 'value',
'key': current_key,
'value': token.value,
'raw': token.raw,
'start_pos': token.start_pos,
'end_pos': token.end_pos,
'value_type': str(token.type.value)
}
if stack:
stack[-1]['children'].append(value_node)
current_key = None
return result
def _renderWithBudget(self, structure: dict) -> str:
"""Render the structure with budget constraints"""
# First, collect all value nodes with their distances from cut
cutPos = len(self.jsonStr)
allValues = self._collectValuesWithDistance(structure, cutPos)
# Sort by distance (closest to cut first)
allValues.sort(key=lambda x: x['distance'])
# Determine which values get full rendering
budgetRemaining = self.budgetLimit
valuesWithBudget = set()
for valInfo in allValues:
valSize = len(str(valInfo['raw']))
if budgetRemaining >= valSize:
valuesWithBudget.add(valInfo['id'])
budgetRemaining -= valSize
# Now render the structure
return self._renderNode(structure, valuesWithBudget, indent=0)
def _collectValuesWithDistance(self, node: dict, cutPos: int, depth: int = 0) -> list:
"""Collect all value nodes with their distance from cut point"""
values = []
if node.get('type') == 'value':
endPos = node.get('end_pos', cutPos)
distance = cutPos - endPos
values.append({
'id': id(node),
'node': node,
'distance': distance,
'raw': node.get('raw', ''),
'depth': depth
})
elif node.get('type') == 'truncated_value':
values.append({
'id': id(node),
'node': node,
'distance': 0, # Truncated values are at the cut
'raw': node.get('raw', ''),
'depth': depth
})
for child in node.get('children', []):
values.extend(self._collectValuesWithDistance(child, cutPos, depth + 1))
return values
def _renderNode(self, node: dict, valuesWithBudget: set, indent: int = 0) -> str:
"""Render a node with budget constraints"""
indent_str = " " * indent
node_type = node.get('type')
if node_type == 'root':
parts = []
for child in node.get('children', []):
parts.append(self._renderNode(child, valuesWithBudget, indent))
return '\n'.join(parts)
elif node_type == 'object':
return self._renderObject(node, valuesWithBudget, indent)
elif node_type == 'array':
return self._renderArray(node, valuesWithBudget, indent)
elif node_type == 'value':
return self._renderValue(node, valuesWithBudget, indent)
elif node_type == 'truncated_value':
return node.get('raw', '')
return ''
def _renderObject(self, node: dict, valuesWithBudget: set, indent: int) -> str:
"""Render an object node"""
indent_str = " " * indent
inner_indent = " " * (indent + 1)
key_prefix = ""
if node.get('key'):
key_prefix = f'"{node["key"]}": '
if not node.get('children'):
if node.get('complete'):
return f"{key_prefix}{{}}"
else:
return f"{key_prefix}{{"
parts = [f"{key_prefix}{{"]
children = node.get('children', [])
for i, child in enumerate(children):
child_rendered = self._renderNode(child, valuesWithBudget, indent + 1)
# Add comma if not last and next sibling exists
if i < len(children) - 1:
if child.get('type') != 'truncated_value':
parts.append(f"{inner_indent}{child_rendered},")
else:
parts.append(f"{inner_indent}{child_rendered}")
else:
parts.append(f"{inner_indent}{child_rendered}")
if node.get('complete'):
parts.append(f"{indent_str}}}")
return '\n'.join(parts)
def _renderArray(self, node: dict, valuesWithBudget: set, indent: int) -> str:
"""Render an array node"""
indent_str = " " * indent
inner_indent = " " * (indent + 1)
key_prefix = ""
if node.get('key'):
key_prefix = f'"{node["key"]}": '
if not node.get('children'):
if node.get('complete'):
return f"{key_prefix}[]"
else:
return f"{key_prefix}["
parts = [f"{key_prefix}["]
children = node.get('children', [])
for i, child in enumerate(children):
child_rendered = self._renderNode(child, valuesWithBudget, indent + 1)
if i < len(children) - 1:
if child.get('type') != 'truncated_value':
parts.append(f"{inner_indent}{child_rendered},")
else:
parts.append(f"{inner_indent}{child_rendered}")
else:
parts.append(f"{inner_indent}{child_rendered}")
if node.get('complete'):
parts.append(f"{indent_str}]")
return '\n'.join(parts)
def _renderValue(self, node: dict, valuesWithBudget: set, indent: int) -> str:
"""Render a value node"""
key_prefix = ""
if node.get('key'):
key_prefix = f'"{node["key"]}": '
if id(node) in valuesWithBudget:
# Full value
default_raw = '"...\"'
raw_value = node.get('raw', default_raw)
return f"{key_prefix}{raw_value}"
else:
# Placeholder
return f'{key_prefix}"..."'
def _renderFromStructure(self, structure: dict) -> str:
"""Render full structure without budget constraints - all values shown"""
# Use V3 renderer with all nodes allocated (no budget constraints)
allNodeIds = set()
self._collectAllNodeIds(structure, allNodeIds)
emptyAllocation = BudgetAllocation(
allocated_node_ids=allNodeIds,
path_node_ids=set(),
summary_mode=False
)
return self._renderNodeV3(structure, 0, emptyAllocation)
def _collectAllNodeIds(self, node: dict, result: set):
"""Collect all node IDs for unlimited rendering"""
result.add(id(node))
for child in node.get('children', []):
self._collectAllNodeIds(child, result)
def _renderWithBudgetFromStructure(self, structure: dict, cutPos: int) -> str:
"""
Render structure with budget logic - allocate from CUT to ROOT.
ALGORITHM:
Phase 1: Build path from cut to root
- Find the cut element (truncated value or deepest incomplete node)
- Build ordered path: [cut_element, parent, grandparent, ..., root]
Phase 2: Allocate budget
- Collect ALL value nodes with their distance to cut
- Sort by distance (smaller = closer to cut = higher priority)
- Allocate budget to values in this order
- When budget < 50: enable summary_mode (affects containers only)
Phase 3: Render
- PATH containers: always render structure
- NON-PATH containers in summary_mode: render as <object>/<array>
- Values: render if allocated, else type hint
Returns:
Rendered JSON string with budget constraints applied
"""
# Phase 1: Build path from cut to root
pathFromCutToRoot = []
self._buildPathFromCutToRootV3(structure, cutPos, [], pathFromCutToRoot)
pathNodeIds = set(id(node) for node in pathFromCutToRoot)
# Phase 2: Collect ALL values and allocate budget
allValues = []
self._collectAllValuesWithDistance(structure, cutPos, allValues)
# Sort by distance (smaller = closer to cut = higher priority)
allValues.sort(key=lambda x: x['distance'])
# Initialize allocation tracker
allocation = BudgetAllocation(
path_node_ids=pathNodeIds,
allocated_node_ids=set(),
summary_mode=False
)
remainingBudget = self.budgetLimit
# Phase 2a: Allocate PATH values first (truncated values are always rendered)
pathValues = [item for item in allValues if id(item['node']) in pathNodeIds]
for item in pathValues:
node = item['node']
nodeType = node.get('type')
if nodeType == 'truncated_value':
allocation.allocated_node_ids.add(id(node))
continue
if nodeType != 'value':
continue
rawValue = node.get('raw', '')
valueSize = len(rawValue)
if valueSize <= remainingBudget:
allocation.allocated_node_ids.add(id(node))
remainingBudget -= valueSize
if remainingBudget < 50:
allocation.summary_mode = True
# Phase 2b: Allocate NON-PATH values (skip if path already triggered summary mode)
if not allocation.summary_mode:
nonPathValues = [item for item in allValues if id(item['node']) not in pathNodeIds]
for item in nonPathValues:
node = item['node']
nodeType = node.get('type')
if nodeType != 'value':
continue
rawValue = node.get('raw', '')
valueSize = len(rawValue)
if valueSize <= remainingBudget:
allocation.allocated_node_ids.add(id(node))
remainingBudget -= valueSize
if remainingBudget < 50 and not allocation.summary_mode:
allocation.summary_mode = True
# Phase 3: Render with allocation info
return self._renderNodeV3(structure, 0, allocation)
def _buildPathFromCutToRootV3(self, node: dict, cutPos: int, currentPath: list, resultPath: list) -> bool:
"""
Recursively find the path from root to cut element, then reverse it.
Result path is ordered: [cut_element, parent, ..., root]
"""
nodeType = node.get('type')
startPos = node.get('start_pos', 0)
endPos = node.get('end_pos', cutPos + 1)
pathWithCurrent = currentPath + [node]
for child in node.get('children', []):
if self._buildPathFromCutToRootV3(child, cutPos, pathWithCurrent, resultPath):
return True
if nodeType == 'truncated_value':
resultPath.clear()
resultPath.extend(reversed(pathWithCurrent))
return True
if nodeType == 'value' and startPos <= cutPos <= endPos:
resultPath.clear()
resultPath.extend(reversed(pathWithCurrent))
return True
if nodeType in ('object', 'array') and not node.get('complete') and startPos <= cutPos:
resultPath.clear()
resultPath.extend(reversed(pathWithCurrent))
return True
if nodeType == 'root' and not resultPath:
resultPath.clear()
resultPath.extend(reversed(pathWithCurrent))
return True
return False
def _collectAllValuesWithDistance(self, node: dict, cutPos: int, result: list, depth: int = 0):
"""Collect ALL value nodes with their distance to cut point."""
nodeType = node.get('type')
if nodeType in ('value', 'truncated_value'):
endPos = node.get('end_pos', cutPos)
distance = cutPos - endPos
result.append({
'node': node,
'distance': distance,
'depth': depth
})
for child in node.get('children', []):
self._collectAllValuesWithDistance(child, cutPos, result, depth + 1)
def _renderNodeV3(self, node: dict, depth: int, allocation: BudgetAllocation) -> str:
"""Render a node with budget allocation info."""
nodeType = node.get('type')
if nodeType == 'root':
parts = []
for child in node.get('children', []):
parts.append(self._renderNodeV3(child, depth, allocation))
return '\n'.join(parts)
elif nodeType == 'object':
return self._renderObjectV3(node, depth, allocation)
elif nodeType == 'array':
return self._renderArrayV3(node, depth, allocation)
elif nodeType == 'value':
return self._renderValueV3(node, depth, allocation)
elif nodeType == 'truncated_value':
keyPrefix = f'"{node.get("key")}": ' if node.get('key') else ''
return f"{keyPrefix}{node.get('raw', '')}"
return ''
def _renderObjectV3(self, node: dict, depth: int, allocation: BudgetAllocation) -> str:
"""Render object - summary mode non-path objects become <object>."""
indentStr = " " * depth
innerIndent = " " * (depth + 1)
keyPrefix = f'"{node.get("key")}": ' if node.get('key') else ''
children = node.get('children', [])
isOnPath = id(node) in allocation.path_node_ids
if allocation.summary_mode and not isOnPath:
return f"{keyPrefix}<object>"
# If object is incomplete and cut is directly here (no incomplete child),
# extract exact string from original JSON to preserve formatting
if not node.get('complete') and node.get('start_pos') is not None:
hasIncompleteChild = any(
child.get('type') in ('object', 'array') and not child.get('complete')
for child in children
)
if not hasIncompleteChild:
return self.jsonStr[node.get('start_pos'):]
if not children:
return f"{keyPrefix}{{}}" if node.get('complete') else f"{keyPrefix}{{"
parts = [f"{keyPrefix}{{"]
for i, child in enumerate(children):
childRendered = self._renderNodeV3(child, depth + 1, allocation)
isLast = (i == len(children) - 1)
isTruncated = child.get('type') == 'truncated_value'
if isLast or isTruncated:
parts.append(f"{innerIndent}{childRendered}")
else:
parts.append(f"{innerIndent}{childRendered},")
if node.get('complete'):
parts.append(f"{indentStr}}}")
return '\n'.join(parts)
def _renderArrayV3(self, node: dict, depth: int, allocation: BudgetAllocation) -> str:
"""Render array - summary mode non-path arrays become <array>.
For arrays ON the path with many children, show:
- First few children (for context)
- ... (N items omitted) ...
- Last N children (closest to cut point)
"""
indentStr = " " * depth
innerIndent = " " * (depth + 1)
keyPrefix = f'"{node.get("key")}": ' if node.get('key') else ''
children = node.get('children', [])
isOnPath = id(node) in allocation.path_node_ids
if allocation.summary_mode and not isOnPath:
return f"{keyPrefix}<array>"
# If array is incomplete and cut is directly here (no incomplete child),
# extract exact string from original JSON to preserve formatting
if not node.get('complete') and node.get('start_pos') is not None:
hasIncompleteChild = any(
child.get('type') in ('object', 'array') and not child.get('complete')
for child in children
)
if not hasIncompleteChild:
return self.jsonStr[node.get('start_pos'):]
if not children:
return f"{keyPrefix}[]" if node.get('complete') else f"{keyPrefix}["
parts = [f"{keyPrefix}["]
# For arrays ON PATH with many children (e.g. table rows):
# Show first 3, then "...", then last N children (from bottom up, using budget)
# This ensures we see context near the cut point
if isOnPath and len(children) > 10 and allocation.summary_mode:
showFirst = 3 # Show first 3 for context
# Calculate how many from the end we can show within budget
# Estimate ~80 chars per row for tables
estimatedCharsPerChild = 80
budgetForEnd = max(500, self.budgetLimit // 2) # Use half budget for end children
showLast = max(5, budgetForEnd // estimatedCharsPerChild)
showLast = min(showLast, len(children) - showFirst - 1) # Don't overlap with first
# Create a modified allocation that includes these children on path
# so they don't get rendered as <array>
childrenToShow = set()
for i in range(min(showFirst, len(children))):
childrenToShow.add(id(children[i]))
startIdx = len(children) - showLast
for i in range(startIdx, len(children)):
childrenToShow.add(id(children[i]))
# Temporarily add children to path_node_ids
originalPathIds = allocation.path_node_ids
extendedPathIds = originalPathIds | childrenToShow
allocation.path_node_ids = extendedPathIds
# Render first N children
for i in range(min(showFirst, len(children))):
child = children[i]
childRendered = self._renderNodeV3(child, depth + 1, allocation)
parts.append(f"{innerIndent}{childRendered},")
# Add ellipsis if there are omitted items
omittedCount = len(children) - showFirst - showLast
if omittedCount > 0:
parts.append(f"{innerIndent}// ... ({omittedCount} items omitted) ...")
# Render last N children (closest to cut)
for i in range(startIdx, len(children)):
child = children[i]
childRendered = self._renderNodeV3(child, depth + 1, allocation)
isLast = (i == len(children) - 1)
isTruncated = child.get('type') == 'truncated_value'
if isLast or isTruncated:
parts.append(f"{innerIndent}{childRendered}")
else:
parts.append(f"{innerIndent}{childRendered},")
# Restore original path_node_ids
allocation.path_node_ids = originalPathIds
else:
# Standard rendering for small arrays or non-path arrays
for i, child in enumerate(children):
childRendered = self._renderNodeV3(child, depth + 1, allocation)
isLast = (i == len(children) - 1)
isTruncated = child.get('type') == 'truncated_value'
if isLast or isTruncated:
parts.append(f"{innerIndent}{childRendered}")
else:
parts.append(f"{innerIndent}{childRendered},")
if node.get('complete'):
parts.append(f"{indentStr}]")
return '\n'.join(parts)
def _renderValueV3(self, node: dict, depth: int, allocation: BudgetAllocation) -> str:
"""Render value - if allocated render full, else type hint."""
keyPrefix = f'"{node.get("key")}": ' if node.get('key') else ''
rawValue = node.get('raw', '""')
valueType = node.get('value_type', 'string')
typeHints = {
'string': '<str>',
'number': '<number>',
'boolean': '<boolean>',
'null': '<null>'
}
typeHint = typeHints.get(valueType, '<value>')
if id(node) in allocation.allocated_node_ids:
return f"{keyPrefix}{rawValue}"
else:
return f"{keyPrefix}{typeHint}"
def _calculateDistancesForBudget(self, node: dict, cutPos: int):
"""Calculate distance from cut point for each value node"""
if node.get('type') == 'value':
endPos = node.get('end_pos', cutPos)
node['distance'] = cutPos - endPos
elif node.get('type') == 'truncated_value':
node['distance'] = 0 # At cut point
else:
for child in node.get('children', []):
self._calculateDistancesForBudget(child, cutPos)
def _collectValuesWithDistance(self, node: dict, values: list, cutPos: int):
"""Collect all value nodes with their distance"""
if node.get('type') == 'value':
values.append({
'node': node,
'distance': node.get('distance', cutPos),
'raw': node.get('raw', '')
})
for child in node.get('children', []):
self._collectValuesWithDistance(child, values, cutPos)
def _isSiblingOf(self, node: dict, other: dict, structure: dict) -> bool:
"""Check if two nodes are siblings (same parent)"""
# This is a simplified check - in practice we'd need parent tracking
# For now, assume nodes at same depth with same parent are siblings
return False # TODO: implement proper sibling detection if needed
def _collectCompleteValues(self, node: dict) -> list:
"""Collect all complete (non-truncated) value nodes (strings, numbers, booleans, null)"""
values = []
# Collect all value types, not just strings (needed for arrays of numbers)
if node.get('type') == 'value':
values.append({
'start_pos': node['start_pos'],
'end_pos': node['end_pos'],
'raw': node['raw'],
'key': node.get('key')
})
for child in node.get('children', []):
values.extend(self._collectCompleteValues(child))
return values
def extractContinuationContexts(
truncatedJson: str
) -> Tuple[str, str, str]:
"""
Main entry point: Extract all three continuation contexts from a truncated JSON.
Generiert drei Kontexte für abgeschnittene JSON-Strings:
1. Overlap Context: Das innerste Objekt/Array-Element, das den Cut-Punkt enthält
- Wird verwendet, um den abgeschnittenen Teil mit dem neuen Teil zu mergen
- Exakt so wie im Original-String (für String-Matching beim Merge)
2. Hierarchy Context: Die hierarchische Struktur vom Root bis zum Cut-Punkt
- Mit Budget-Logik: Näher am Cut = vollständige Werte, weiter weg = "..." Platzhalter
- Gibt der AI den Kontext der gesamten JSON-Struktur
3. Complete Part: Der vollständige, valide JSON bis zum Cut-Punkt
- Alle offenen Strukturen werden geschlossen (}, ], ")
- Unvollständige Keys werden entfernt
- Kann direkt als valides JSON geparst werden
Uses module constants BUDGET_LIMIT and OVERLAP_MAX_CHARS.
Args:
truncatedJson: The truncated JSON string
Returns:
Tuple of (overlapContext, hierarchyContext, hierarchyContextForPrompt, completePart):
- overlapContext: The innermost object/element containing the cut (for merging)
- hierarchyContext: Full structure from root to cut WITHOUT budget limitations
- hierarchyContextForPrompt: Full structure from root to cut WITH budget limitations
- completePart: Valid JSON with all structures properly closed
Example:
>>> jsonStr = '{"users": [{"name": "John", "bio": "Hello Wor'
>>> overlap, hierarchy, hierarchyForPrompt, complete = extractContinuationContexts(jsonStr)
>>> import json
>>> parsed = json.loads(complete) # ✓ Funktioniert!
"""
return getJsonContinuationContext(truncatedJson)
# =============================================================================
# JSON REPAIR FUNCTIONS
# =============================================================================
def _repairInternalJsonErrors(jsonStr: str) -> str:
"""
Repair internal JSON errors WITHOUT touching incomplete structures at cut point.
This function fixes common internal JSON issues:
- Invalid escape sequences (e.g., \\x, \\u without proper hex)
- Unescaped control characters
- Invalid Unicode characters
- Trailing commas before closing brackets/braces
- Comments (// and /* */)
- Single quotes instead of double quotes (outside of string values)
- Unquoted keys
IMPORTANT: Does NOT modify incomplete structures at the end of the JSON.
Those are handled separately by structure closing logic.
Args:
jsonStr: JSON string that may have internal errors
Returns:
Repaired JSON string with internal errors fixed
"""
if not jsonStr or not jsonStr.strip():
return jsonStr
result = jsonStr
# Fix 1: Remove BOM and normalize whitespace at start
if result.startswith('\ufeff'):
result = result[1:]
# Fix 2: Normalize smart quotes to straight quotes
result = result.replace('"', '"').replace('"', '"')
result = result.replace(''', "'").replace(''', "'")
# Fix 3: Remove JavaScript-style comments (but be careful not to break strings)
result = _removeJsonComments(result)
# Fix 4: Fix invalid escape sequences
result = _fixInvalidEscapeSequences(result)
# Fix 5: Remove trailing commas before ] or }
result = _removeTrailingCommas(result)
# Fix 6: Fix unquoted keys (simple cases only)
result = _fixUnquotedKeys(result)
# Fix 7: Fix unescaped quotes inside string values
# This handles AI-generated JSON with quotes like: "text with "quoted" words"
result = _fixUnescapedQuotesInStrings(result)
# Fix 8: Fix unescaped control characters (ASCII 0-31)
result = _fixUnescapedControlCharacters(result)
return result
def _removeJsonComments(jsonStr: str) -> str:
"""Remove JavaScript-style comments from JSON, preserving strings."""
result = []
i = 0
inString = False
escaped = False
while i < len(jsonStr):
char = jsonStr[i]
if escaped:
result.append(char)
escaped = False
i += 1
continue
if char == '\\' and inString:
result.append(char)
escaped = True
i += 1
continue
if char == '"':
inString = not inString
result.append(char)
i += 1
continue
if inString:
result.append(char)
i += 1
continue
# Check for // comment
if char == '/' and i + 1 < len(jsonStr) and jsonStr[i + 1] == '/':
# Skip until end of line
while i < len(jsonStr) and jsonStr[i] != '\n':
i += 1
continue
# Check for /* */ comment
if char == '/' and i + 1 < len(jsonStr) and jsonStr[i + 1] == '*':
i += 2
while i + 1 < len(jsonStr):
if jsonStr[i] == '*' and jsonStr[i + 1] == '/':
i += 2
break
i += 1
continue
result.append(char)
i += 1
return ''.join(result)
def _fixInvalidEscapeSequences(jsonStr: str) -> str:
"""Fix invalid escape sequences in JSON strings."""
result = []
i = 0
inString = False
while i < len(jsonStr):
char = jsonStr[i]
if char == '"' and (i == 0 or jsonStr[i - 1] != '\\'):
inString = not inString
result.append(char)
i += 1
continue
if inString and char == '\\' and i + 1 < len(jsonStr):
nextChar = jsonStr[i + 1]
# Valid JSON escape sequences: \", \\, \/, \b, \f, \n, \r, \t, \uXXXX
validEscapes = ['"', '\\', '/', 'b', 'f', 'n', 'r', 't', 'u']
if nextChar in validEscapes:
if nextChar == 'u':
# Check if followed by 4 hex digits
if i + 5 < len(jsonStr) and all(c in '0123456789abcdefABCDEF' for c in jsonStr[i + 2:i + 6]):
result.append(char)
i += 1
continue
else:
# Invalid \u sequence - escape the backslash
result.append('\\')
result.append('\\')
i += 1
continue
else:
result.append(char)
i += 1
continue
else:
# Invalid escape - escape the backslash
result.append('\\')
result.append('\\')
i += 1
continue
result.append(char)
i += 1
return ''.join(result)
def _removeTrailingCommas(jsonStr: str) -> str:
"""Remove trailing commas before ] or } (not valid in JSON)."""
# Pattern: comma followed by whitespace and ] or }
result = re.sub(r',(\s*[}\]])', r'\1', jsonStr)
return result
def _fixUnquotedKeys(jsonStr: str) -> str:
"""
Fix simple unquoted keys in JSON objects.
Only handles simple cases to avoid breaking valid JSON.
"""
# Pattern: { or , followed by whitespace and an unquoted identifier and :
# Be conservative - only fix clear cases
result = []
i = 0
inString = False
escaped = False
while i < len(jsonStr):
char = jsonStr[i]
if escaped:
result.append(char)
escaped = False
i += 1
continue
if char == '\\' and inString:
result.append(char)
escaped = True
i += 1
continue
if char == '"':
inString = not inString
result.append(char)
i += 1
continue
if inString:
result.append(char)
i += 1
continue
# Check for unquoted key after { or ,
if char in '{,' and i + 1 < len(jsonStr):
result.append(char)
i += 1
# Skip whitespace
while i < len(jsonStr) and jsonStr[i] in ' \t\n\r':
result.append(jsonStr[i])
i += 1
if i >= len(jsonStr):
continue
# Check if next is an unquoted identifier (starts with letter or _)
if jsonStr[i] not in '"{[' and (jsonStr[i].isalpha() or jsonStr[i] == '_'):
# Collect the identifier
keyStart = i
while i < len(jsonStr) and (jsonStr[i].isalnum() or jsonStr[i] == '_'):
i += 1
key = jsonStr[keyStart:i]
# Skip whitespace
while i < len(jsonStr) and jsonStr[i] in ' \t\n\r':
i += 1
# Check if followed by :
if i < len(jsonStr) and jsonStr[i] == ':':
# This was an unquoted key - quote it
result.append('"')
result.append(key)
result.append('"')
else:
# Not a key, put back as-is
result.append(key)
continue
result.append(char)
i += 1
return ''.join(result)
def _fixUnescapedQuotesInStrings(jsonStr: str) -> str:
"""
Fix unescaped quotes inside JSON string values.
AI often generates JSON with unescaped quotes like:
"text with "quoted" words"
This should be:
"text with \"quoted\" words"
Strategy:
- Parse JSON structure to find string values
- Within a string, find unescaped quotes that are followed by content
that looks like it continues the string (not a : or , or } or ])
- Escape those quotes
"""
if not jsonStr or not jsonStr.strip():
return jsonStr
result = []
i = 0
inString = False
stringStart = -1
escaped = False
while i < len(jsonStr):
char = jsonStr[i]
if escaped:
result.append(char)
escaped = False
i += 1
continue
if char == '\\' and inString:
result.append(char)
escaped = True
i += 1
continue
if char == '"':
if not inString:
# Starting a string
inString = True
stringStart = i
result.append(char)
i += 1
continue
else:
# Could be end of string OR unescaped quote inside string
# Look ahead to determine
nextNonSpace = i + 1
while nextNonSpace < len(jsonStr) and jsonStr[nextNonSpace] in ' \t\n\r':
nextNonSpace += 1
if nextNonSpace < len(jsonStr):
nextChar = jsonStr[nextNonSpace]
# If next char is a structural character, this is end of string
if nextChar in ':,}]':
inString = False
result.append(char)
i += 1
continue
# If next char is a quote, might be end of string followed by another string
# Check if we're at a reasonable string end (has a colon or comma before next structure)
if nextChar == '"':
# This is end of string, start of next
inString = False
result.append(char)
i += 1
continue
# Otherwise, this quote is INSIDE the string - escape it!
result.append('\\')
result.append(char)
i += 1
continue
else:
# End of JSON - this must be closing quote
inString = False
result.append(char)
i += 1
continue
result.append(char)
i += 1
return ''.join(result)
def _fixUnescapedControlCharacters(jsonStr: str) -> str:
"""
Fix unescaped control characters in JSON strings.
JSON requires control characters (ASCII 0-31) to be escaped as \\uXXXX.
Common ones have shortcuts: \\n, \\r, \\t, \\b, \\f
This function finds unescaped control chars inside strings and escapes them.
"""
if not jsonStr or not jsonStr.strip():
return jsonStr
result = []
i = 0
inString = False
escaped = False
# Mapping of common control chars to their escape sequences
controlEscapes = {
'\n': '\\n',
'\r': '\\r',
'\t': '\\t',
'\b': '\\b',
'\f': '\\f',
}
while i < len(jsonStr):
char = jsonStr[i]
if escaped:
result.append(char)
escaped = False
i += 1
continue
if char == '\\' and inString:
result.append(char)
escaped = True
i += 1
continue
if char == '"':
inString = not inString
result.append(char)
i += 1
continue
if inString:
# Check for control characters (ASCII 0-31)
if ord(char) < 32:
if char in controlEscapes:
result.append(controlEscapes[char])
else:
# Use \uXXXX format for other control chars
result.append(f'\\u{ord(char):04x}')
i += 1
continue
result.append(char)
i += 1
return ''.join(result)
def _tryParseJson(jsonStr: str) -> tuple:
"""
Try to parse JSON string and return (parsed, error).
Returns:
Tuple of (parsed_object, error_string)
- If successful: (parsed_object, None)
- If failed: (None, error_message)
"""
if not jsonStr or not jsonStr.strip():
return None, "Empty JSON string"
try:
parsed = json.loads(jsonStr)
return parsed, None
except json.JSONDecodeError as e:
return None, str(e)
except Exception as e:
return None, str(e)
# Convenience function with named results
def getContexts(
truncatedJson: str
) -> JsonContinuationContexts:
"""
Get all contexts as a Pydantic model with named fields.
Uses module constants BUDGET_LIMIT and OVERLAP_MAX_CHARS.
This function:
1. Extracts continuation contexts (overlap, hierarchy, completePart)
2. Tries to parse completePart as JSON
3. If parsing fails, repairs internal errors and retries
4. Sets jsonParsingSuccess to indicate if completePart is valid JSON
5. Sets overlapContext="" if JSON is complete (no cut point)
IMPORTANT: overlapContext="" signals that JSON is complete (no more data expected).
This happens when the original JSON is already valid (no structures needed closing).
Args:
truncatedJson: The truncated JSON string
Returns:
JsonContinuationContexts Pydantic model with:
- overlapContext: The innermost object/element containing the cut
Empty string "" if JSON is complete (no cut point)
- hierarchyContext: Full structure WITHOUT budget limitations (for internal use)
- hierarchyContextForPrompt: Full structure WITH budget limitations (for prompts)
- completePart: Valid JSON with all structures properly closed
- jsonParsingSuccess: True if completePart is valid parseable JSON
Example:
>>> json_str = '{"users": [{"name": "John", "bio": "Hello Wor'
>>> contexts = getContexts(json_str)
>>> print(contexts.overlapContext) # Contains cut point context
>>> print(contexts.jsonParsingSuccess)
>>> complete_json = '{"users": [{"name": "John"}]}'
>>> contexts = getContexts(complete_json)
>>> print(contexts.overlapContext) # "" (empty - JSON is complete)
>>> print(contexts.jsonParsingSuccess) # True
"""
# First, check if original JSON is already complete (parseable without modification)
jsonIsComplete = False
if truncatedJson and truncatedJson.strip():
parsed, error = _tryParseJson(truncatedJson.strip())
if error is None:
jsonIsComplete = True
logger.debug("Original JSON is already complete (no cut point)")
# Extract contexts
overlap, hierarchy, hierarchyForPrompt, completePart = extractContinuationContexts(truncatedJson)
# If JSON is complete (no cut point), set overlapContext to empty string
# This signals that no more continuation is needed
if jsonIsComplete:
overlap = ""
logger.debug("Setting overlapContext='' (JSON is complete)")
# Try to parse completePart as JSON
jsonParsingSuccess = False
if completePart and completePart.strip():
# First attempt: parse as-is
parsed, error = _tryParseJson(completePart)
if error is None:
jsonParsingSuccess = True
else:
# Second attempt: repair internal errors and retry
logger.debug(f"Initial parse failed: {error}, attempting repair")
repairedCompletePart = _repairInternalJsonErrors(completePart)
parsed, error = _tryParseJson(repairedCompletePart)
if error is None:
# Repair succeeded - use repaired version
completePart = repairedCompletePart
jsonParsingSuccess = True
logger.debug("JSON repair successful")
else:
# Repair also failed - keep original completePart, mark as failed
logger.debug(f"JSON repair also failed: {error}")
jsonParsingSuccess = False
return JsonContinuationContexts(
overlapContext=overlap,
hierarchyContext=hierarchy,
hierarchyContextForPrompt=hierarchyForPrompt,
completePart=completePart,
jsonParsingSuccess=jsonParsingSuccess
)