""" JSON Continuation Context Module Generiert drei Kontexte für abgeschnittene JSON-Strings: 1. Overlap Context: Das innerste Objekt/Array-Element, das den Cut-Punkt enthält 2. Hierarchy Context: Die hierarchische Struktur vom Root bis zum Cut mit Budget-Logik 3. Complete Part: Der vollständige Teil des JSONs mit allen Strukturen geschlossen Hauptfunktionen: - extractContinuationContexts(truncatedJson: str) -> Tuple[str, str, str] Extrahiert alle drei Kontexte aus einem abgeschnittenen JSON-String. - getContexts(truncatedJson: str) -> JsonContinuationContexts Gibt alle Kontexte als Pydantic-Modell zurück mit benannten Feldern. Modulkonstanten: - BUDGET_LIMIT: int = 500 Zeichen-Budget für vollständige Datenwerte im Hierarchy Context - OVERLAP_MAX_CHARS: int = 1000 Maximale Zeichen für den Overlap Context Verwendung: >>> from modules.shared.jsonContinuation import getContexts >>> jsonStr = '{"users": [{"name": "John", "bio": "Hello Wor' >>> contexts = getContexts(jsonStr) >>> print(contexts.overlapContext) >>> print(contexts.hierarchyContext) >>> print(contexts.completePart) Autor: Claude Version: 2.0 """ import json import logging import re from typing import Tuple, List, Optional, Any, Set from dataclasses import dataclass, field from enum import Enum from modules.datamodels.datamodelAi import JsonContinuationContexts logger = logging.getLogger(__name__) # ============================================================================= # MODULE CONSTANTS # ============================================================================= BUDGET_LIMIT: int = 2000 """Zeichen-Budget für vollständige Datenwerte im Hierarchy Context""" OVERLAP_MAX_CHARS: int = 1000 """Maximale Zeichen für den Overlap Context""" # ============================================================================= # TOKEN TYPES AND DATA CLASSES # ============================================================================= class JsonTokenType(Enum): """JSON Token Types""" OBJECT_START = "{" OBJECT_END = "}" ARRAY_START = "[" ARRAY_END = "]" STRING = "string" NUMBER = "number" BOOLEAN = "boolean" NULL = "null" COLON = ":" COMMA = "," KEY = "key" EOF = "eof" TRUNCATED = "truncated" @dataclass class JsonToken: """Represents a JSON token with position info""" type: JsonTokenType value: Any start_pos: int end_pos: int raw: str # Original string representation @dataclass class StackFrame: """Represents a level in the JSON hierarchy""" type: str # "object" or "array" start_pos: int key: Optional[str] = None # Current key for objects index: int = 0 # Current index for arrays content: str = "" # Accumulated content for this frame keys_seen: List[str] = None # Keys seen in this object def __post_init__(self): if self.keys_seen is None: self.keys_seen = [] class JsonTokenizer: """Tokenizer for potentially truncated JSON strings""" def __init__(self, jsonStr: str): self.jsonStr = jsonStr self.pos = 0 self.length = len(jsonStr) def skipWhitespace(self): """Skip whitespace characters""" while self.pos < self.length and self.jsonStr[self.pos] in ' \t\n\r': self.pos += 1 def peek(self) -> Optional[str]: """Peek at current character without consuming""" if self.pos < self.length: return self.jsonStr[self.pos] return None def readString(self) -> JsonToken: """Read a JSON string token""" start_pos = self.pos self.pos += 1 # Skip opening quote escaped = False while self.pos < self.length: char = self.jsonStr[self.pos] if escaped: escaped = False self.pos += 1 elif char == '\\': escaped = True self.pos += 1 elif char == '"': self.pos += 1 raw = self.jsonStr[start_pos:self.pos] try: # Try to decode the string value value = raw[1:-1] # Remove quotes for value except: value = raw return JsonToken(JsonTokenType.STRING, value, start_pos, self.pos, raw) else: self.pos += 1 # String was truncated raw = self.jsonStr[start_pos:self.pos] return JsonToken(JsonTokenType.TRUNCATED, raw[1:] if len(raw) > 1 else "", start_pos, self.pos, raw) def readNumber(self) -> JsonToken: """Read a JSON number token""" start_pos = self.pos # Handle negative if self.pos < self.length and self.jsonStr[self.pos] == '-': self.pos += 1 # Read digits while self.pos < self.length and self.jsonStr[self.pos].isdigit(): self.pos += 1 # Decimal part if self.pos < self.length and self.jsonStr[self.pos] == '.': self.pos += 1 while self.pos < self.length and self.jsonStr[self.pos].isdigit(): self.pos += 1 # Exponent if self.pos < self.length and self.jsonStr[self.pos] in 'eE': self.pos += 1 if self.pos < self.length and self.jsonStr[self.pos] in '+-': self.pos += 1 while self.pos < self.length and self.jsonStr[self.pos].isdigit(): self.pos += 1 raw = self.jsonStr[start_pos:self.pos] try: value = float(raw) if '.' in raw or 'e' in raw.lower() else int(raw) except ValueError: value = raw return JsonToken(JsonTokenType.NUMBER, value, start_pos, self.pos, raw) def readKeyword(self) -> JsonToken: """Read true, false, or null""" start_pos = self.pos for keyword, token_type in [('true', JsonTokenType.BOOLEAN), ('false', JsonTokenType.BOOLEAN), ('null', JsonTokenType.NULL)]: if self.jsonStr[self.pos:].startswith(keyword): self.pos += len(keyword) value = True if keyword == 'true' else (False if keyword == 'false' else None) return JsonToken(token_type, value, start_pos, self.pos, keyword) # Partial keyword (truncated) while self.pos < self.length and self.jsonStr[self.pos].isalpha(): self.pos += 1 raw = self.jsonStr[start_pos:self.pos] return JsonToken(JsonTokenType.TRUNCATED, raw, start_pos, self.pos, raw) def nextJsonToken(self) -> JsonToken: """Get the next token""" self.skipWhitespace() if self.pos >= self.length: return JsonToken(JsonTokenType.EOF, None, self.pos, self.pos, "") char = self.jsonStr[self.pos] startPos = self.pos if char == '{': self.pos += 1 return JsonToken(JsonTokenType.OBJECT_START, '{', startPos, self.pos, '{') elif char == '}': self.pos += 1 return JsonToken(JsonTokenType.OBJECT_END, '}', startPos, self.pos, '}') elif char == '[': self.pos += 1 return JsonToken(JsonTokenType.ARRAY_START, '[', startPos, self.pos, '[') elif char == ']': self.pos += 1 return JsonToken(JsonTokenType.ARRAY_END, ']', startPos, self.pos, ']') elif char == ':': self.pos += 1 return JsonToken(JsonTokenType.COLON, ':', startPos, self.pos, ':') elif char == ',': self.pos += 1 return JsonToken(JsonTokenType.COMMA, ',', startPos, self.pos, ',') elif char == '"': return self.readString() elif char == '-' or char.isdigit(): return self.readNumber() elif char.isalpha(): return self.readKeyword() else: # Unknown character, treat as truncated self.pos += 1 return JsonToken(JsonTokenType.TRUNCATED, char, startPos, self.pos, char) @dataclass class HierarchyLevel: """Represents one level in the parsed hierarchy""" type: str # "object" or "array" start_pos: int end_pos: int # -1 if not closed key: Optional[str] # Key if this is a value in an object index: Optional[int] # Index if this is in an array content: dict # Parsed content at this level raw_start: str # Raw string from start to children children_content: List[Any] # For arrays: list of parsed elements def getJsonContinuationContext( truncatedJson: str, budgetLimit: Optional[int] = None, overlapMaxChars: Optional[int] = None ) -> Tuple[str, str, str, str]: """ Generate continuation contexts for a truncated JSON string. Generiert vier Kontexte für abgeschnittene JSON-Strings: 1. Overlap Context: Das innerste Objekt/Array-Element, das den Cut-Punkt enthält 2. Hierarchy Context: Die hierarchische Struktur vom Root bis zum Cut OHNE Budget-Limits (für interne Nutzung) 3. Hierarchy Context For Prompt: Die hierarchische Struktur vom Root bis zum Cut MIT Budget-Limits (für Prompts) 4. Complete Part: Der vollständige Teil des JSONs mit allen Strukturen geschlossen Args: truncatedJson: The truncated JSON string budgetLimit: Character budget for data values in hierarchy context (uses BUDGET_LIMIT if None) overlapMaxChars: Maximum characters for overlap context (uses OVERLAP_MAX_CHARS if None) Returns: Tuple of (overlapContext, hierarchyContext, hierarchyContextForPrompt, completePart): - overlapContext: The innermost object/element containing the cut (for merging) - hierarchyContext: Full structure from root to cut WITHOUT budget limitations (for internal use) - hierarchyContextForPrompt: Full structure from root to cut WITH budget limitations (for prompts) - completePart: Valid JSON with all structures properly closed """ if budgetLimit is None: budgetLimit = BUDGET_LIMIT if overlapMaxChars is None: overlapMaxChars = OVERLAP_MAX_CHARS analyzer = JsonAnalyzer(truncatedJson, budgetLimit, overlapMaxChars) return analyzer.analyze() @dataclass class BudgetAllocation: """Tracks which nodes have been allocated budget""" allocated_node_ids: Set[int] = field(default_factory=set) path_node_ids: Set[int] = field(default_factory=set) summary_mode: bool = False class JsonAnalyzer: """ Analyzes truncated JSON and generates continuation contexts. Generates three contexts for truncated JSON strings: 1. Overlap Context: The innermost object/array element containing the cut point 2. Hierarchy Context: The hierarchical structure from root to cut with budget logic 3. Complete Part: The complete part of the JSON with all structures properly closed """ def __init__(self, jsonStr: str, budgetLimit: Optional[int] = None, overlapMaxChars: Optional[int] = None): self.jsonStr = jsonStr self.budgetLimit = budgetLimit if budgetLimit is not None else BUDGET_LIMIT self.overlapMaxChars = overlapMaxChars if overlapMaxChars is not None else OVERLAP_MAX_CHARS self.stack: List[StackFrame] = [] self.hierarchy: List[dict] = [] # Parsed hierarchy info def analyze(self) -> Tuple[str, str, str]: """ Analyze the truncated JSON and return all three contexts. Returns: Tuple of (overlapContext, hierarchyContext, completePart) """ # Parse and track the structure self._parseStructure() # Generate overlap context overlapContext = self._generateOverlapContext() # Parse structure for hierarchy (needed for both contexts) structure = self._parseForHierarchy() cutPos = len(self.jsonStr) # Build both hierarchy contexts from the SAME structure BEFORE generating complete part # CRITICAL: hierarchyContext must be the EXACT original JSON (for merge overlap detection!) # The rendered version would have different formatting, breaking overlap matching hierarchyContext = self.jsonStr # Generate hierarchy context WITH budget (for prompts) - uses same structure hierarchyContextForPrompt = self._renderWithBudgetFromStructure(structure, cutPos) # Generate complete part (JSON with all structures closed) completePart = self._generateCompletePart() return overlapContext, hierarchyContext, hierarchyContextForPrompt, completePart def _generateCompletePart(self) -> str: """ Generate the complete part of the JSON with all structures properly closed. This creates valid JSON by closing all open strings, brackets/braces. Unvollständige Keys werden entfernt, damit das Ergebnis valides JSON ist. Unvollständige Keywords (true, false, null) werden vervollständigt. Strategy: 1. Take the full truncated JSON 2. If we're in the middle of a string, close it 3. Complete incomplete keywords (tr → true, f → false, n → null) 4. Remove incomplete key-value pairs (keys without values) 5. Close all open brackets/braces """ result = self.jsonStr.rstrip() # Remove trailing comma if present (after stripping) if result.endswith(','): result = result[:-1] # Check if we need to close an open string stringClosing = self._getStringClosing(result) result += stringClosing # Complete incomplete keywords (true, false, null) result = self._completeIncompleteKeywords(result) # Check if we're in the middle of a key (after colon) # If string was just closed and we're after a colon with no value, remove the key result = self._cleanIncompleteKeyValue(result) # Close all open structures closingBrackets = self._getClosingBrackets(result) return result + closingBrackets def _getStringClosing(self, jsonStr: str) -> str: """Check if there's an unclosed string and return closing quote if needed.""" in_string = False escaped = False for char in jsonStr: if escaped: escaped = False continue if char == '\\' and in_string: escaped = True continue if char == '"': in_string = not in_string return '"' if in_string else "" def _cleanIncompleteKeyValue(self, jsonStr: str) -> str: """ Clean up incomplete key-value pairs. Handles cases like: - {"key": "incompl -> keep (valid truncated value) - {"key": -> remove key - {"a": 1, "key -> remove incomplete key (was in middle of key name) """ stripped = jsonStr.rstrip() # Pattern: ends with colon (possibly with whitespace) - incomplete value if stripped.endswith(':'): # Find the start of this key and remove the whole key-value return self._removeLastKey(stripped) # Check if we just closed a string that was an incomplete key # Pattern: ..., "something" or { "something" where something has no colon after # This happens when we close a truncated key name like "add" -> "add" if stripped.endswith('"'): # Look for the pattern: comma/bracket + whitespace + "string" # and check if this was supposed to be a key if self._isIncompleteKey(stripped): return self._removeLastKey(stripped) return jsonStr def _completeIncompleteKeywords(self, jsonStr: str) -> str: """ Complete incomplete JSON keywords at the end of the string. Checks the last element for incomplete keywords after colon: - ": t*" or ": f*" or ": n*" -> complete to true/false/null - ": " or ":" (without keyword) -> set to null """ result = jsonStr.rstrip() # Find the last colon (not in string) in_string = False escaped = False last_colon_pos = -1 for i in range(len(result) - 1, -1, -1): char = result[i] if escaped: escaped = False continue if char == '\\' and in_string: escaped = True continue if char == '"': in_string = not in_string continue if not in_string and char == ':': last_colon_pos = i break if last_colon_pos < 0: return result # Get text after the last colon after_colon = result[last_colon_pos + 1:].strip() # Check for incomplete keyword patterns if after_colon.startswith('t') or after_colon.startswith('T'): # Incomplete true keyword_start = last_colon_pos + 1 # Skip whitespace while keyword_start < len(result) and result[keyword_start] in ' \t\n\r': keyword_start += 1 # Remove partial keyword keyword_end = keyword_start + 1 while keyword_end < len(result) and result[keyword_end].isalpha(): keyword_end += 1 return result[:keyword_start] + 'true' + result[keyword_end:] elif after_colon.startswith('f') or after_colon.startswith('F'): # Incomplete false keyword_start = last_colon_pos + 1 while keyword_start < len(result) and result[keyword_start] in ' \t\n\r': keyword_start += 1 keyword_end = keyword_start + 1 while keyword_end < len(result) and result[keyword_end].isalpha(): keyword_end += 1 return result[:keyword_start] + 'false' + result[keyword_end:] elif after_colon.startswith('n') or after_colon.startswith('N'): # Incomplete null keyword_start = last_colon_pos + 1 while keyword_start < len(result) and result[keyword_start] in ' \t\n\r': keyword_start += 1 keyword_end = keyword_start + 1 while keyword_end < len(result) and result[keyword_end].isalpha(): keyword_end += 1 return result[:keyword_start] + 'null' + result[keyword_end:] elif not after_colon or after_colon == '': # No keyword after colon -> set to null return result + 'null' return result def _isIncompleteKey(self, jsonStr: str) -> bool: """ Check if the last string in the JSON is an incomplete key in an object. This happens when truncation occurred in the middle of a key name. Only applies to objects, not arrays. """ # Find the last complete string pos = len(jsonStr) - 1 if jsonStr[pos] != '"': return False # Find the opening quote of this string stringStart = pos - 1 while stringStart >= 0: if jsonStr[stringStart] == '"': # Check it's not escaped numBackslashes = 0 checkPos = stringStart - 1 while checkPos >= 0 and jsonStr[checkPos] == '\\': numBackslashes += 1 checkPos -= 1 if numBackslashes % 2 == 0: break stringStart -= 1 if stringStart < 0: return False # Now stringStart points to opening quote # Check what's before it (skip whitespace) beforePos = stringStart - 1 while beforePos >= 0 and jsonStr[beforePos] in ' \t\n\r': beforePos -= 1 if beforePos < 0: return False # For this to be an incomplete key, it must be preceded by { or , # AND we must be inside an object (not an array) if jsonStr[beforePos] not in ',{': return False # Now check if we're in an object context (not array) # Count open braces/brackets to determine context braceCount = 0 bracketCount = 0 inString = False for i in range(beforePos + 1): char = jsonStr[i] if char == '"' and (i == 0 or jsonStr[i-1] != '\\'): inString = not inString elif not inString: if char == '{': braceCount += 1 elif char == '}': braceCount -= 1 elif char == '[': bracketCount += 1 elif char == ']': bracketCount -= 1 # If we have more open braces than brackets at this point, # we're in an object context # Actually, we need to check the innermost container # Let's track the stack properly stack = [] inString = False for i in range(beforePos + 1): char = jsonStr[i] if char == '"' and (i == 0 or jsonStr[i-1] != '\\'): inString = not inString elif not inString: if char == '{': stack.append('object') elif char == '[': stack.append('array') elif char == '}': if stack and stack[-1] == 'object': stack.pop() elif char == ']': if stack and stack[-1] == 'array': stack.pop() # If innermost container is an object, this is an incomplete key return len(stack) > 0 and stack[-1] == 'object' def _removeLastKey(self, jsonStr: str) -> str: """Remove the last incomplete key-value pair from the JSON string.""" stripped = jsonStr.rstrip() # Find the last comma or opening bracket before the incomplete key pos = len(stripped) - 1 # Skip past the current string/key in_string = False while pos >= 0: char = stripped[pos] if char == '"' and (pos == 0 or stripped[pos-1] != '\\'): in_string = not in_string if not in_string and char in ',{': break pos -= 1 if pos < 0: return stripped if stripped[pos] == ',': # Remove from comma onwards return stripped[:pos] elif stripped[pos] == '{': # Keep the opening brace return stripped[:pos+1] return stripped def _findLastCompletePosition(self) -> int: """Find the position of the last complete value in the JSON.""" tokenizer = JsonTokenizer(self.jsonStr) last_complete_pos = 0 stack_depth = 0 last_value_end = 0 in_value = False while True: token = tokenizer.nextJsonToken() if token.type == JsonTokenType.EOF: break if token.type == JsonTokenType.TRUNCATED: # Return position before the truncated part break if token.type in (JsonTokenType.OBJECT_START, JsonTokenType.ARRAY_START): stack_depth += 1 in_value = True elif token.type in (JsonTokenType.OBJECT_END, JsonTokenType.ARRAY_END): stack_depth -= 1 last_value_end = token.end_pos in_value = False elif token.type == JsonTokenType.STRING: # Check if this is a key or a value saved_pos = tokenizer.pos tokenizer.skipWhitespace() next_char = tokenizer.peek() tokenizer.pos = saved_pos if next_char != ':': # It's a value last_value_end = token.end_pos in_value = False elif token.type in (JsonTokenType.NUMBER, JsonTokenType.BOOLEAN, JsonTokenType.NULL): last_value_end = token.end_pos in_value = False elif token.type == JsonTokenType.COMMA: # After a comma, we've completed a value last_complete_pos = last_value_end # Return the last complete position return last_value_end if last_value_end > 0 else len(self.jsonStr) def _getClosingBrackets(self, jsonStr: str) -> str: """Determine what closing brackets are needed.""" stack = [] in_string = False escaped = False for char in jsonStr: if escaped: escaped = False continue if char == '\\' and in_string: escaped = True continue if char == '"': in_string = not in_string continue if in_string: continue if char == '{': stack.append('}') elif char == '[': stack.append(']') elif char == '}': if stack and stack[-1] == '}': stack.pop() elif char == ']': if stack and stack[-1] == ']': stack.pop() # Return closing brackets in reverse order return ''.join(reversed(stack)) def _parseStructure(self): """Parse the JSON structure and track hierarchy""" tokenizer = JsonTokenizer(self.jsonStr) while True: token = tokenizer.nextJsonToken() if token.type == JsonTokenType.EOF or token.type == JsonTokenType.TRUNCATED: break if token.type == JsonTokenType.OBJECT_START: frame = StackFrame( type="object", start_pos=token.start_pos, keys_seen=[] ) self.stack.append(frame) elif token.type == JsonTokenType.ARRAY_START: frame = StackFrame( type="array", start_pos=token.start_pos, index=0 ) self.stack.append(frame) elif token.type == JsonTokenType.OBJECT_END: if self.stack and self.stack[-1].type == "object": self.stack.pop() elif token.type == JsonTokenType.ARRAY_END: if self.stack and self.stack[-1].type == "array": self.stack.pop() elif token.type == JsonTokenType.STRING: # Could be a key or a value self._handleStringJsonToken(token, tokenizer) elif token.type == JsonTokenType.COMMA: # Increment array index if self.stack and self.stack[-1].type == "array": self.stack[-1].index += 1 def _handleStringJsonToken(self, token: JsonToken, tokenizer: JsonTokenizer): """Handle a string token (could be key or value)""" if self.stack and self.stack[-1].type == "object": # Check if this is a key (followed by colon) saved_pos = tokenizer.pos tokenizer.skipWhitespace() next_char = tokenizer.peek() if next_char == ':': # This is a key self.stack[-1].key = token.value self.stack[-1].keys_seen.append(token.value) tokenizer.pos = saved_pos def _generateOverlapContext(self) -> str: """ Generate the overlap context - the innermost object/array element containing the cut. Returns the raw string from the start of that element to the end of the truncated JSON. Dieser Kontext wird verwendet, um den abgeschnittenen Teil mit dem neuen Teil zu mergen. Exakt so wie im Original-String (für String-Matching beim Merge). SPECIAL CASE: If cut point is within a list element, return the entire list object (from opening bracket). """ if not self.stack: # No structure, return last overlap_max_chars characters return self.jsonStr[-self.overlapMaxChars:] # Find the innermost container that should be the overlap innermost = self.stack[-1] # SPECIAL CASE: If innermost is an array, return the entire array (from opening bracket) if innermost.type == "array": overlap_start = innermost.start_pos else: # For objects, use the standard logic overlap_start = self._findInnermostElementStart() overlap = self.jsonStr[overlap_start:] # Apply max chars limit if len(overlap) > self.overlapMaxChars: overlap = self.jsonStr[-self.overlapMaxChars:] return overlap def _findAllArrayElementStarts(self, arrayFrame: StackFrame) -> List[int]: """Find all element start positions in an array""" arrayContent = self.jsonStr[arrayFrame.start_pos:] # Skip the opening bracket and whitespace pos = 1 while pos < len(arrayContent) and arrayContent[pos] in ' \t\n\r': pos += 1 elementStarts = [arrayFrame.start_pos + pos] depth = 0 inString = False escaped = False i = pos while i < len(arrayContent): char = arrayContent[i] if escaped: escaped = False i += 1 continue if char == '\\' and inString: escaped = True i += 1 continue if char == '"': inString = not inString i += 1 continue if inString: i += 1 continue if char in '{[': depth += 1 elif char in '}]': depth -= 1 elif char == ',' and depth == 0: # Found element boundary i += 1 # Skip whitespace while i < len(arrayContent) and arrayContent[i] in ' \t\n\r': i += 1 elementStarts.append(arrayFrame.start_pos + i) i += 1 return elementStarts def _findInnermostElementStart(self) -> int: """Find the start position of the innermost element for overlap""" if not self.stack: return max(0, len(self.jsonStr) - self.overlapMaxChars) # Walk through stack to find the innermost array element or object # We want the innermost "atomic" unit that contains the cut # Strategy: # - If innermost is an object: return its start # - If innermost is an array: # - If current element is an object/array: return start of that element # - If current element is a primitive: return start of array or last N chars innermost = self.stack[-1] if innermost.type == "object": return innermost.start_pos else: # It's an array - find the start of the current element element_start = self._findArrayElementStart(innermost) # Check if the element is a primitive or complex type element_content = self.jsonStr[element_start:].strip() # If it starts with { or [ it's complex, return the element start if element_content and element_content[0] in '{[': return element_start else: # Primitive in array - check if there's a parent object # or return overlap_max_chars from end for i in range(len(self.stack) - 2, -1, -1): if self.stack[i].type == "object": return self.stack[i].start_pos # No parent object, return max chars from end return max(0, len(self.jsonStr) - self.overlapMaxChars) def _findArrayElementStart(self, arrayFrame: StackFrame) -> int: """Find the start position of the current array element""" # We need to find the start of the current element in the array # Parse from array start to find element boundaries arrayContent = self.jsonStr[arrayFrame.start_pos:] # Skip the opening bracket and whitespace pos = 1 while pos < len(arrayContent) and arrayContent[pos] in ' \t\n\r': pos += 1 elementStarts = [arrayFrame.start_pos + pos] depth = 0 inString = False escaped = False i = pos while i < len(arrayContent): char = arrayContent[i] if escaped: escaped = False i += 1 continue if char == '\\' and inString: escaped = True i += 1 continue if char == '"': inString = not inString i += 1 continue if inString: i += 1 continue if char in '{[': depth += 1 elif char in '}]': depth -= 1 elif char == ',' and depth == 0: # Found element boundary i += 1 # Skip whitespace while i < len(arrayContent) and arrayContent[i] in ' \t\n\r': i += 1 elementStarts.append(arrayFrame.start_pos + i) i += 1 # Return the start of the current element if arrayFrame.index < len(elementStarts): return elementStarts[arrayFrame.index] elif elementStarts: return elementStarts[-1] else: return arrayFrame.start_pos def _generateHierarchyContext(self) -> str: """ Generate the hierarchy context with budget logic. Shows structure from root to cut point with data values limited by budget. """ if not self.stack: # No structure return self.jsonStr[-self.overlapMaxChars:] # We need to rebuild the JSON with budget logic # Priority: elements closer to cut get full values, distant ones get "..." return self._rebuildWithBudget() def _rebuildWithBudget(self) -> str: """Rebuild JSON from root to cut with budget constraints""" # Strategy: # 1. Parse the JSON structure tracking all values # 2. Calculate total value size # 3. Apply budget from cut backwards # 4. Render with "..." for values outside budget # First, get a structured representation structure = self._parseForHierarchy() # Now render with budget return self._renderWithBudget(structure) def _parseForHierarchy(self) -> dict: """Parse JSON into a structure suitable for hierarchy rendering""" result = { 'type': 'root', 'children': [], 'raw_positions': [] } tokenizer = JsonTokenizer(self.jsonStr) stack = [result] current_key = None while True: token = tokenizer.nextJsonToken() if token.type == JsonTokenType.EOF: break if token.type == JsonTokenType.TRUNCATED: # Mark the truncation point if stack: current = stack[-1] if current.get('type') == 'object': if current_key: current['children'].append({ 'type': 'truncated_value', 'key': current_key, 'raw': self.jsonStr[token.start_pos:], 'start_pos': token.start_pos }) elif current.get('type') == 'array': current['children'].append({ 'type': 'truncated_value', 'raw': self.jsonStr[token.start_pos:], 'start_pos': token.start_pos }) break if token.type == JsonTokenType.OBJECT_START: obj = { 'type': 'object', 'key': current_key, 'children': [], 'start_pos': token.start_pos } if stack: stack[-1]['children'].append(obj) stack.append(obj) current_key = None elif token.type == JsonTokenType.ARRAY_START: arr = { 'type': 'array', 'key': current_key, 'children': [], 'start_pos': token.start_pos } if stack: stack[-1]['children'].append(arr) stack.append(arr) current_key = None elif token.type == JsonTokenType.OBJECT_END: if len(stack) > 1 and stack[-1].get('type') == 'object': stack[-1]['end_pos'] = token.end_pos stack[-1]['complete'] = True stack.pop() elif token.type == JsonTokenType.ARRAY_END: if len(stack) > 1 and stack[-1].get('type') == 'array': stack[-1]['end_pos'] = token.end_pos stack[-1]['complete'] = True stack.pop() elif token.type == JsonTokenType.STRING: # Check if it's a key saved_pos = tokenizer.pos tokenizer.skipWhitespace() next_char = tokenizer.peek() if next_char == ':' and stack and stack[-1].get('type') == 'object': current_key = token.value else: # It's a value value_node = { 'type': 'value', 'key': current_key, 'value': token.value, 'raw': token.raw, 'start_pos': token.start_pos, 'end_pos': token.end_pos, 'value_type': 'string' } if stack: stack[-1]['children'].append(value_node) current_key = None tokenizer.pos = saved_pos elif token.type in (JsonTokenType.NUMBER, JsonTokenType.BOOLEAN, JsonTokenType.NULL): value_node = { 'type': 'value', 'key': current_key, 'value': token.value, 'raw': token.raw, 'start_pos': token.start_pos, 'end_pos': token.end_pos, 'value_type': str(token.type.value) } if stack: stack[-1]['children'].append(value_node) current_key = None return result def _renderWithBudget(self, structure: dict) -> str: """Render the structure with budget constraints""" # First, collect all value nodes with their distances from cut cutPos = len(self.jsonStr) allValues = self._collectValuesWithDistance(structure, cutPos) # Sort by distance (closest to cut first) allValues.sort(key=lambda x: x['distance']) # Determine which values get full rendering budgetRemaining = self.budgetLimit valuesWithBudget = set() for valInfo in allValues: valSize = len(str(valInfo['raw'])) if budgetRemaining >= valSize: valuesWithBudget.add(valInfo['id']) budgetRemaining -= valSize # Now render the structure return self._renderNode(structure, valuesWithBudget, indent=0) def _collectValuesWithDistance(self, node: dict, cutPos: int, depth: int = 0) -> list: """Collect all value nodes with their distance from cut point""" values = [] if node.get('type') == 'value': endPos = node.get('end_pos', cutPos) distance = cutPos - endPos values.append({ 'id': id(node), 'node': node, 'distance': distance, 'raw': node.get('raw', ''), 'depth': depth }) elif node.get('type') == 'truncated_value': values.append({ 'id': id(node), 'node': node, 'distance': 0, # Truncated values are at the cut 'raw': node.get('raw', ''), 'depth': depth }) for child in node.get('children', []): values.extend(self._collectValuesWithDistance(child, cutPos, depth + 1)) return values def _renderNode(self, node: dict, valuesWithBudget: set, indent: int = 0) -> str: """Render a node with budget constraints""" indent_str = " " * indent node_type = node.get('type') if node_type == 'root': parts = [] for child in node.get('children', []): parts.append(self._renderNode(child, valuesWithBudget, indent)) return '\n'.join(parts) elif node_type == 'object': return self._renderObject(node, valuesWithBudget, indent) elif node_type == 'array': return self._renderArray(node, valuesWithBudget, indent) elif node_type == 'value': return self._renderValue(node, valuesWithBudget, indent) elif node_type == 'truncated_value': return node.get('raw', '') return '' def _renderObject(self, node: dict, valuesWithBudget: set, indent: int) -> str: """Render an object node""" indent_str = " " * indent inner_indent = " " * (indent + 1) key_prefix = "" if node.get('key'): key_prefix = f'"{node["key"]}": ' if not node.get('children'): if node.get('complete'): return f"{key_prefix}{{}}" else: return f"{key_prefix}{{" parts = [f"{key_prefix}{{"] children = node.get('children', []) for i, child in enumerate(children): child_rendered = self._renderNode(child, valuesWithBudget, indent + 1) # Add comma if not last and next sibling exists if i < len(children) - 1: if child.get('type') != 'truncated_value': parts.append(f"{inner_indent}{child_rendered},") else: parts.append(f"{inner_indent}{child_rendered}") else: parts.append(f"{inner_indent}{child_rendered}") if node.get('complete'): parts.append(f"{indent_str}}}") return '\n'.join(parts) def _renderArray(self, node: dict, valuesWithBudget: set, indent: int) -> str: """Render an array node""" indent_str = " " * indent inner_indent = " " * (indent + 1) key_prefix = "" if node.get('key'): key_prefix = f'"{node["key"]}": ' if not node.get('children'): if node.get('complete'): return f"{key_prefix}[]" else: return f"{key_prefix}[" parts = [f"{key_prefix}["] children = node.get('children', []) for i, child in enumerate(children): child_rendered = self._renderNode(child, valuesWithBudget, indent + 1) if i < len(children) - 1: if child.get('type') != 'truncated_value': parts.append(f"{inner_indent}{child_rendered},") else: parts.append(f"{inner_indent}{child_rendered}") else: parts.append(f"{inner_indent}{child_rendered}") if node.get('complete'): parts.append(f"{indent_str}]") return '\n'.join(parts) def _renderValue(self, node: dict, valuesWithBudget: set, indent: int) -> str: """Render a value node""" key_prefix = "" if node.get('key'): key_prefix = f'"{node["key"]}": ' if id(node) in valuesWithBudget: # Full value default_raw = '"...\"' raw_value = node.get('raw', default_raw) return f"{key_prefix}{raw_value}" else: # Placeholder return f'{key_prefix}"..."' def _renderFromStructure(self, structure: dict) -> str: """Render full structure without budget constraints - all values shown""" # Use V3 renderer with all nodes allocated (no budget constraints) allNodeIds = set() self._collectAllNodeIds(structure, allNodeIds) emptyAllocation = BudgetAllocation( allocated_node_ids=allNodeIds, path_node_ids=set(), summary_mode=False ) return self._renderNodeV3(structure, 0, emptyAllocation) def _collectAllNodeIds(self, node: dict, result: set): """Collect all node IDs for unlimited rendering""" result.add(id(node)) for child in node.get('children', []): self._collectAllNodeIds(child, result) def _renderWithBudgetFromStructure(self, structure: dict, cutPos: int) -> str: """ Render structure with budget logic - allocate from CUT to ROOT. ALGORITHM: Phase 1: Build path from cut to root - Find the cut element (truncated value or deepest incomplete node) - Build ordered path: [cut_element, parent, grandparent, ..., root] Phase 2: Allocate budget - Collect ALL value nodes with their distance to cut - Sort by distance (smaller = closer to cut = higher priority) - Allocate budget to values in this order - When budget < 50: enable summary_mode (affects containers only) Phase 3: Render - PATH containers: always render structure - NON-PATH containers in summary_mode: render as / - Values: render if allocated, else type hint Returns: Rendered JSON string with budget constraints applied """ # Phase 1: Build path from cut to root pathFromCutToRoot = [] self._buildPathFromCutToRootV3(structure, cutPos, [], pathFromCutToRoot) pathNodeIds = set(id(node) for node in pathFromCutToRoot) # Phase 2: Collect ALL values and allocate budget allValues = [] self._collectAllValuesWithDistance(structure, cutPos, allValues) # Sort by distance (smaller = closer to cut = higher priority) allValues.sort(key=lambda x: x['distance']) # Initialize allocation tracker allocation = BudgetAllocation( path_node_ids=pathNodeIds, allocated_node_ids=set(), summary_mode=False ) remainingBudget = self.budgetLimit # Phase 2a: Allocate PATH values first (truncated values are always rendered) pathValues = [item for item in allValues if id(item['node']) in pathNodeIds] for item in pathValues: node = item['node'] nodeType = node.get('type') if nodeType == 'truncated_value': allocation.allocated_node_ids.add(id(node)) continue if nodeType != 'value': continue rawValue = node.get('raw', '') valueSize = len(rawValue) if valueSize <= remainingBudget: allocation.allocated_node_ids.add(id(node)) remainingBudget -= valueSize if remainingBudget < 50: allocation.summary_mode = True # Phase 2b: Allocate NON-PATH values (skip if path already triggered summary mode) if not allocation.summary_mode: nonPathValues = [item for item in allValues if id(item['node']) not in pathNodeIds] for item in nonPathValues: node = item['node'] nodeType = node.get('type') if nodeType != 'value': continue rawValue = node.get('raw', '') valueSize = len(rawValue) if valueSize <= remainingBudget: allocation.allocated_node_ids.add(id(node)) remainingBudget -= valueSize if remainingBudget < 50 and not allocation.summary_mode: allocation.summary_mode = True # Phase 3: Render with allocation info return self._renderNodeV3(structure, 0, allocation) def _buildPathFromCutToRootV3(self, node: dict, cutPos: int, currentPath: list, resultPath: list) -> bool: """ Recursively find the path from root to cut element, then reverse it. Result path is ordered: [cut_element, parent, ..., root] """ nodeType = node.get('type') startPos = node.get('start_pos', 0) endPos = node.get('end_pos', cutPos + 1) pathWithCurrent = currentPath + [node] for child in node.get('children', []): if self._buildPathFromCutToRootV3(child, cutPos, pathWithCurrent, resultPath): return True if nodeType == 'truncated_value': resultPath.clear() resultPath.extend(reversed(pathWithCurrent)) return True if nodeType == 'value' and startPos <= cutPos <= endPos: resultPath.clear() resultPath.extend(reversed(pathWithCurrent)) return True if nodeType in ('object', 'array') and not node.get('complete') and startPos <= cutPos: resultPath.clear() resultPath.extend(reversed(pathWithCurrent)) return True if nodeType == 'root' and not resultPath: resultPath.clear() resultPath.extend(reversed(pathWithCurrent)) return True return False def _collectAllValuesWithDistance(self, node: dict, cutPos: int, result: list, depth: int = 0): """Collect ALL value nodes with their distance to cut point.""" nodeType = node.get('type') if nodeType in ('value', 'truncated_value'): endPos = node.get('end_pos', cutPos) distance = cutPos - endPos result.append({ 'node': node, 'distance': distance, 'depth': depth }) for child in node.get('children', []): self._collectAllValuesWithDistance(child, cutPos, result, depth + 1) def _renderNodeV3(self, node: dict, depth: int, allocation: BudgetAllocation) -> str: """Render a node with budget allocation info.""" nodeType = node.get('type') if nodeType == 'root': parts = [] for child in node.get('children', []): parts.append(self._renderNodeV3(child, depth, allocation)) return '\n'.join(parts) elif nodeType == 'object': return self._renderObjectV3(node, depth, allocation) elif nodeType == 'array': return self._renderArrayV3(node, depth, allocation) elif nodeType == 'value': return self._renderValueV3(node, depth, allocation) elif nodeType == 'truncated_value': keyPrefix = f'"{node.get("key")}": ' if node.get('key') else '' return f"{keyPrefix}{node.get('raw', '')}" return '' def _renderObjectV3(self, node: dict, depth: int, allocation: BudgetAllocation) -> str: """Render object - summary mode non-path objects become .""" indentStr = " " * depth innerIndent = " " * (depth + 1) keyPrefix = f'"{node.get("key")}": ' if node.get('key') else '' children = node.get('children', []) isOnPath = id(node) in allocation.path_node_ids if allocation.summary_mode and not isOnPath: return f"{keyPrefix}" # If object is incomplete and cut is directly here (no incomplete child), # extract exact string from original JSON to preserve formatting if not node.get('complete') and node.get('start_pos') is not None: hasIncompleteChild = any( child.get('type') in ('object', 'array') and not child.get('complete') for child in children ) if not hasIncompleteChild: return self.jsonStr[node.get('start_pos'):] if not children: return f"{keyPrefix}{{}}" if node.get('complete') else f"{keyPrefix}{{" parts = [f"{keyPrefix}{{"] for i, child in enumerate(children): childRendered = self._renderNodeV3(child, depth + 1, allocation) isLast = (i == len(children) - 1) isTruncated = child.get('type') == 'truncated_value' if isLast or isTruncated: parts.append(f"{innerIndent}{childRendered}") else: parts.append(f"{innerIndent}{childRendered},") if node.get('complete'): parts.append(f"{indentStr}}}") return '\n'.join(parts) def _renderArrayV3(self, node: dict, depth: int, allocation: BudgetAllocation) -> str: """Render array - summary mode non-path arrays become . For arrays ON the path with many children, show: - First few children (for context) - ... (N items omitted) ... - Last N children (closest to cut point) """ indentStr = " " * depth innerIndent = " " * (depth + 1) keyPrefix = f'"{node.get("key")}": ' if node.get('key') else '' children = node.get('children', []) isOnPath = id(node) in allocation.path_node_ids if allocation.summary_mode and not isOnPath: return f"{keyPrefix}" # If array is incomplete and cut is directly here (no incomplete child), # extract exact string from original JSON to preserve formatting if not node.get('complete') and node.get('start_pos') is not None: hasIncompleteChild = any( child.get('type') in ('object', 'array') and not child.get('complete') for child in children ) if not hasIncompleteChild: return self.jsonStr[node.get('start_pos'):] if not children: return f"{keyPrefix}[]" if node.get('complete') else f"{keyPrefix}[" parts = [f"{keyPrefix}["] # For arrays ON PATH with many children (e.g. table rows): # Show first 3, then "...", then last N children (from bottom up, using budget) # This ensures we see context near the cut point if isOnPath and len(children) > 10 and allocation.summary_mode: showFirst = 3 # Show first 3 for context # Calculate how many from the end we can show within budget # Estimate ~80 chars per row for tables estimatedCharsPerChild = 80 budgetForEnd = max(500, self.budgetLimit // 2) # Use half budget for end children showLast = max(5, budgetForEnd // estimatedCharsPerChild) showLast = min(showLast, len(children) - showFirst - 1) # Don't overlap with first # Create a modified allocation that includes these children on path # so they don't get rendered as childrenToShow = set() for i in range(min(showFirst, len(children))): childrenToShow.add(id(children[i])) startIdx = len(children) - showLast for i in range(startIdx, len(children)): childrenToShow.add(id(children[i])) # Temporarily add children to path_node_ids originalPathIds = allocation.path_node_ids extendedPathIds = originalPathIds | childrenToShow allocation.path_node_ids = extendedPathIds # Render first N children for i in range(min(showFirst, len(children))): child = children[i] childRendered = self._renderNodeV3(child, depth + 1, allocation) parts.append(f"{innerIndent}{childRendered},") # Add ellipsis if there are omitted items omittedCount = len(children) - showFirst - showLast if omittedCount > 0: parts.append(f"{innerIndent}// ... ({omittedCount} items omitted) ...") # Render last N children (closest to cut) for i in range(startIdx, len(children)): child = children[i] childRendered = self._renderNodeV3(child, depth + 1, allocation) isLast = (i == len(children) - 1) isTruncated = child.get('type') == 'truncated_value' if isLast or isTruncated: parts.append(f"{innerIndent}{childRendered}") else: parts.append(f"{innerIndent}{childRendered},") # Restore original path_node_ids allocation.path_node_ids = originalPathIds else: # Standard rendering for small arrays or non-path arrays for i, child in enumerate(children): childRendered = self._renderNodeV3(child, depth + 1, allocation) isLast = (i == len(children) - 1) isTruncated = child.get('type') == 'truncated_value' if isLast or isTruncated: parts.append(f"{innerIndent}{childRendered}") else: parts.append(f"{innerIndent}{childRendered},") if node.get('complete'): parts.append(f"{indentStr}]") return '\n'.join(parts) def _renderValueV3(self, node: dict, depth: int, allocation: BudgetAllocation) -> str: """Render value - if allocated render full, else type hint.""" keyPrefix = f'"{node.get("key")}": ' if node.get('key') else '' rawValue = node.get('raw', '""') valueType = node.get('value_type', 'string') typeHints = { 'string': '', 'number': '', 'boolean': '', 'null': '' } typeHint = typeHints.get(valueType, '') if id(node) in allocation.allocated_node_ids: return f"{keyPrefix}{rawValue}" else: return f"{keyPrefix}{typeHint}" def _calculateDistancesForBudget(self, node: dict, cutPos: int): """Calculate distance from cut point for each value node""" if node.get('type') == 'value': endPos = node.get('end_pos', cutPos) node['distance'] = cutPos - endPos elif node.get('type') == 'truncated_value': node['distance'] = 0 # At cut point else: for child in node.get('children', []): self._calculateDistancesForBudget(child, cutPos) def _collectValuesWithDistance(self, node: dict, values: list, cutPos: int): """Collect all value nodes with their distance""" if node.get('type') == 'value': values.append({ 'node': node, 'distance': node.get('distance', cutPos), 'raw': node.get('raw', '') }) for child in node.get('children', []): self._collectValuesWithDistance(child, values, cutPos) def _isSiblingOf(self, node: dict, other: dict, structure: dict) -> bool: """Check if two nodes are siblings (same parent)""" # This is a simplified check - in practice we'd need parent tracking # For now, assume nodes at same depth with same parent are siblings return False # TODO: implement proper sibling detection if needed def _collectCompleteValues(self, node: dict) -> list: """Collect all complete (non-truncated) value nodes (strings, numbers, booleans, null)""" values = [] # Collect all value types, not just strings (needed for arrays of numbers) if node.get('type') == 'value': values.append({ 'start_pos': node['start_pos'], 'end_pos': node['end_pos'], 'raw': node['raw'], 'key': node.get('key') }) for child in node.get('children', []): values.extend(self._collectCompleteValues(child)) return values def extractContinuationContexts( truncatedJson: str ) -> Tuple[str, str, str]: """ Main entry point: Extract all three continuation contexts from a truncated JSON. Generiert drei Kontexte für abgeschnittene JSON-Strings: 1. Overlap Context: Das innerste Objekt/Array-Element, das den Cut-Punkt enthält - Wird verwendet, um den abgeschnittenen Teil mit dem neuen Teil zu mergen - Exakt so wie im Original-String (für String-Matching beim Merge) 2. Hierarchy Context: Die hierarchische Struktur vom Root bis zum Cut-Punkt - Mit Budget-Logik: Näher am Cut = vollständige Werte, weiter weg = "..." Platzhalter - Gibt der AI den Kontext der gesamten JSON-Struktur 3. Complete Part: Der vollständige, valide JSON bis zum Cut-Punkt - Alle offenen Strukturen werden geschlossen (}, ], ") - Unvollständige Keys werden entfernt - Kann direkt als valides JSON geparst werden Uses module constants BUDGET_LIMIT and OVERLAP_MAX_CHARS. Args: truncatedJson: The truncated JSON string Returns: Tuple of (overlapContext, hierarchyContext, hierarchyContextForPrompt, completePart): - overlapContext: The innermost object/element containing the cut (for merging) - hierarchyContext: Full structure from root to cut WITHOUT budget limitations - hierarchyContextForPrompt: Full structure from root to cut WITH budget limitations - completePart: Valid JSON with all structures properly closed Example: >>> jsonStr = '{"users": [{"name": "John", "bio": "Hello Wor' >>> overlap, hierarchy, hierarchyForPrompt, complete = extractContinuationContexts(jsonStr) >>> import json >>> parsed = json.loads(complete) # ✓ Funktioniert! """ return getJsonContinuationContext(truncatedJson) # ============================================================================= # JSON REPAIR FUNCTIONS # ============================================================================= def _repairInternalJsonErrors(jsonStr: str) -> str: """ Repair internal JSON errors WITHOUT touching incomplete structures at cut point. This function fixes common internal JSON issues: - Invalid escape sequences (e.g., \\x, \\u without proper hex) - Unescaped control characters - Invalid Unicode characters - Trailing commas before closing brackets/braces - Comments (// and /* */) - Single quotes instead of double quotes (outside of string values) - Unquoted keys IMPORTANT: Does NOT modify incomplete structures at the end of the JSON. Those are handled separately by structure closing logic. Args: jsonStr: JSON string that may have internal errors Returns: Repaired JSON string with internal errors fixed """ if not jsonStr or not jsonStr.strip(): return jsonStr result = jsonStr # Fix 1: Remove BOM and normalize whitespace at start if result.startswith('\ufeff'): result = result[1:] # Fix 2: Normalize smart quotes to straight quotes result = result.replace('"', '"').replace('"', '"') result = result.replace(''', "'").replace(''', "'") # Fix 3: Remove JavaScript-style comments (but be careful not to break strings) result = _removeJsonComments(result) # Fix 4: Fix invalid escape sequences result = _fixInvalidEscapeSequences(result) # Fix 5: Remove trailing commas before ] or } result = _removeTrailingCommas(result) # Fix 6: Fix unquoted keys (simple cases only) result = _fixUnquotedKeys(result) # Fix 7: Fix unescaped quotes inside string values # This handles AI-generated JSON with quotes like: "text with "quoted" words" result = _fixUnescapedQuotesInStrings(result) # Fix 8: Fix unescaped control characters (ASCII 0-31) result = _fixUnescapedControlCharacters(result) return result def _removeJsonComments(jsonStr: str) -> str: """Remove JavaScript-style comments from JSON, preserving strings.""" result = [] i = 0 inString = False escaped = False while i < len(jsonStr): char = jsonStr[i] if escaped: result.append(char) escaped = False i += 1 continue if char == '\\' and inString: result.append(char) escaped = True i += 1 continue if char == '"': inString = not inString result.append(char) i += 1 continue if inString: result.append(char) i += 1 continue # Check for // comment if char == '/' and i + 1 < len(jsonStr) and jsonStr[i + 1] == '/': # Skip until end of line while i < len(jsonStr) and jsonStr[i] != '\n': i += 1 continue # Check for /* */ comment if char == '/' and i + 1 < len(jsonStr) and jsonStr[i + 1] == '*': i += 2 while i + 1 < len(jsonStr): if jsonStr[i] == '*' and jsonStr[i + 1] == '/': i += 2 break i += 1 continue result.append(char) i += 1 return ''.join(result) def _fixInvalidEscapeSequences(jsonStr: str) -> str: """Fix invalid escape sequences in JSON strings.""" result = [] i = 0 inString = False while i < len(jsonStr): char = jsonStr[i] if char == '"' and (i == 0 or jsonStr[i - 1] != '\\'): inString = not inString result.append(char) i += 1 continue if inString and char == '\\' and i + 1 < len(jsonStr): nextChar = jsonStr[i + 1] # Valid JSON escape sequences: \", \\, \/, \b, \f, \n, \r, \t, \uXXXX validEscapes = ['"', '\\', '/', 'b', 'f', 'n', 'r', 't', 'u'] if nextChar in validEscapes: if nextChar == 'u': # Check if followed by 4 hex digits if i + 5 < len(jsonStr) and all(c in '0123456789abcdefABCDEF' for c in jsonStr[i + 2:i + 6]): result.append(char) i += 1 continue else: # Invalid \u sequence - escape the backslash result.append('\\') result.append('\\') i += 1 continue else: result.append(char) i += 1 continue else: # Invalid escape - escape the backslash result.append('\\') result.append('\\') i += 1 continue result.append(char) i += 1 return ''.join(result) def _removeTrailingCommas(jsonStr: str) -> str: """Remove trailing commas before ] or } (not valid in JSON).""" # Pattern: comma followed by whitespace and ] or } result = re.sub(r',(\s*[}\]])', r'\1', jsonStr) return result def _fixUnquotedKeys(jsonStr: str) -> str: """ Fix simple unquoted keys in JSON objects. Only handles simple cases to avoid breaking valid JSON. """ # Pattern: { or , followed by whitespace and an unquoted identifier and : # Be conservative - only fix clear cases result = [] i = 0 inString = False escaped = False while i < len(jsonStr): char = jsonStr[i] if escaped: result.append(char) escaped = False i += 1 continue if char == '\\' and inString: result.append(char) escaped = True i += 1 continue if char == '"': inString = not inString result.append(char) i += 1 continue if inString: result.append(char) i += 1 continue # Check for unquoted key after { or , if char in '{,' and i + 1 < len(jsonStr): result.append(char) i += 1 # Skip whitespace while i < len(jsonStr) and jsonStr[i] in ' \t\n\r': result.append(jsonStr[i]) i += 1 if i >= len(jsonStr): continue # Check if next is an unquoted identifier (starts with letter or _) if jsonStr[i] not in '"{[' and (jsonStr[i].isalpha() or jsonStr[i] == '_'): # Collect the identifier keyStart = i while i < len(jsonStr) and (jsonStr[i].isalnum() or jsonStr[i] == '_'): i += 1 key = jsonStr[keyStart:i] # Skip whitespace while i < len(jsonStr) and jsonStr[i] in ' \t\n\r': i += 1 # Check if followed by : if i < len(jsonStr) and jsonStr[i] == ':': # This was an unquoted key - quote it result.append('"') result.append(key) result.append('"') else: # Not a key, put back as-is result.append(key) continue result.append(char) i += 1 return ''.join(result) def _fixUnescapedQuotesInStrings(jsonStr: str) -> str: """ Fix unescaped quotes inside JSON string values. AI often generates JSON with unescaped quotes like: "text with "quoted" words" This should be: "text with \"quoted\" words" Strategy: - Parse JSON structure to find string values - Within a string, find unescaped quotes that are followed by content that looks like it continues the string (not a : or , or } or ]) - Escape those quotes """ if not jsonStr or not jsonStr.strip(): return jsonStr result = [] i = 0 inString = False stringStart = -1 escaped = False while i < len(jsonStr): char = jsonStr[i] if escaped: result.append(char) escaped = False i += 1 continue if char == '\\' and inString: result.append(char) escaped = True i += 1 continue if char == '"': if not inString: # Starting a string inString = True stringStart = i result.append(char) i += 1 continue else: # Could be end of string OR unescaped quote inside string # Look ahead to determine nextNonSpace = i + 1 while nextNonSpace < len(jsonStr) and jsonStr[nextNonSpace] in ' \t\n\r': nextNonSpace += 1 if nextNonSpace < len(jsonStr): nextChar = jsonStr[nextNonSpace] # If next char is a structural character, this is end of string if nextChar in ':,}]': inString = False result.append(char) i += 1 continue # If next char is a quote, might be end of string followed by another string # Check if we're at a reasonable string end (has a colon or comma before next structure) if nextChar == '"': # This is end of string, start of next inString = False result.append(char) i += 1 continue # Otherwise, this quote is INSIDE the string - escape it! result.append('\\') result.append(char) i += 1 continue else: # End of JSON - this must be closing quote inString = False result.append(char) i += 1 continue result.append(char) i += 1 return ''.join(result) def _fixUnescapedControlCharacters(jsonStr: str) -> str: """ Fix unescaped control characters in JSON strings. JSON requires control characters (ASCII 0-31) to be escaped as \\uXXXX. Common ones have shortcuts: \\n, \\r, \\t, \\b, \\f This function finds unescaped control chars inside strings and escapes them. """ if not jsonStr or not jsonStr.strip(): return jsonStr result = [] i = 0 inString = False escaped = False # Mapping of common control chars to their escape sequences controlEscapes = { '\n': '\\n', '\r': '\\r', '\t': '\\t', '\b': '\\b', '\f': '\\f', } while i < len(jsonStr): char = jsonStr[i] if escaped: result.append(char) escaped = False i += 1 continue if char == '\\' and inString: result.append(char) escaped = True i += 1 continue if char == '"': inString = not inString result.append(char) i += 1 continue if inString: # Check for control characters (ASCII 0-31) if ord(char) < 32: if char in controlEscapes: result.append(controlEscapes[char]) else: # Use \uXXXX format for other control chars result.append(f'\\u{ord(char):04x}') i += 1 continue result.append(char) i += 1 return ''.join(result) def _tryParseJson(jsonStr: str) -> tuple: """ Try to parse JSON string and return (parsed, error). Returns: Tuple of (parsed_object, error_string) - If successful: (parsed_object, None) - If failed: (None, error_message) """ if not jsonStr or not jsonStr.strip(): return None, "Empty JSON string" try: parsed = json.loads(jsonStr) return parsed, None except json.JSONDecodeError as e: return None, str(e) except Exception as e: return None, str(e) # Convenience function with named results def getContexts( truncatedJson: str ) -> JsonContinuationContexts: """ Get all contexts as a Pydantic model with named fields. Uses module constants BUDGET_LIMIT and OVERLAP_MAX_CHARS. This function: 1. Extracts continuation contexts (overlap, hierarchy, completePart) 2. Tries to parse completePart as JSON 3. If parsing fails, repairs internal errors and retries 4. Sets jsonParsingSuccess to indicate if completePart is valid JSON 5. Sets overlapContext="" if JSON is complete (no cut point) IMPORTANT: overlapContext="" signals that JSON is complete (no more data expected). This happens when the original JSON is already valid (no structures needed closing). Args: truncatedJson: The truncated JSON string Returns: JsonContinuationContexts Pydantic model with: - overlapContext: The innermost object/element containing the cut Empty string "" if JSON is complete (no cut point) - hierarchyContext: Full structure WITHOUT budget limitations (for internal use) - hierarchyContextForPrompt: Full structure WITH budget limitations (for prompts) - completePart: Valid JSON with all structures properly closed - jsonParsingSuccess: True if completePart is valid parseable JSON Example: >>> json_str = '{"users": [{"name": "John", "bio": "Hello Wor' >>> contexts = getContexts(json_str) >>> print(contexts.overlapContext) # Contains cut point context >>> print(contexts.jsonParsingSuccess) >>> complete_json = '{"users": [{"name": "John"}]}' >>> contexts = getContexts(complete_json) >>> print(contexts.overlapContext) # "" (empty - JSON is complete) >>> print(contexts.jsonParsingSuccess) # True """ # Completeness must use the same pipeline as callers (fences, balanced extract, normalization). from modules.shared.jsonUtils import tryParseJson as _utils_try_parse_json jsonIsComplete = False if truncatedJson and truncatedJson.strip(): _parsed_hdr, error_hdr, _ = _utils_try_parse_json(truncatedJson) if error_hdr is None: jsonIsComplete = True logger.debug("Original JSON is already complete (no cut point)") # Extract contexts overlap, hierarchy, hierarchyForPrompt, completePart = extractContinuationContexts(truncatedJson) # If JSON is complete (no cut point), set overlapContext to empty string # This signals that no more continuation is needed if jsonIsComplete: overlap = "" logger.debug("Setting overlapContext='' (JSON is complete)") # Try to parse completePart as JSON jsonParsingSuccess = False if completePart and completePart.strip(): parsed, error, _ = _utils_try_parse_json(completePart) if error is None: jsonParsingSuccess = True else: logger.debug(f"Initial parse failed: {error}, attempting internal repair") repairedCompletePart = _repairInternalJsonErrors(completePart) parsed, error, _ = _utils_try_parse_json(repairedCompletePart) if error is None: completePart = repairedCompletePart jsonParsingSuccess = True logger.debug("JSON repair successful") else: logger.debug(f"JSON repair also failed: {error}") jsonParsingSuccess = False # If completePart parses successfully, the merged/candidate JSON is structurally complete # after repair/closing — overlap from extractContinuationContexts on the *raw* candidate # would falsely signal truncation and trap callAiWithLooping in continuation iterations. if jsonParsingSuccess: overlap = "" return JsonContinuationContexts( overlapContext=overlap, hierarchyContext=hierarchy, hierarchyContextForPrompt=hierarchyForPrompt, completePart=completePart, jsonParsingSuccess=jsonParsingSuccess )