""" JSON Continuation Context Module Generiert drei Kontexte für abgeschnittene JSON-Strings: 1. Overlap Context: Das innerste Objekt/Array-Element, das den Cut-Punkt enthält 2. Hierarchy Context: Die hierarchische Struktur vom Root bis zum Cut mit Budget-Logik 3. Complete Part: Der vollständige Teil des JSONs mit allen Strukturen geschlossen Hauptfunktionen: - extractContinuationContexts(truncatedJson: str) -> Tuple[str, str, str] Extrahiert alle drei Kontexte aus einem abgeschnittenen JSON-String. - getContexts(truncatedJson: str) -> JsonContinuationContexts Gibt alle Kontexte als Pydantic-Modell zurück mit benannten Feldern. Modulkonstanten: - BUDGET_LIMIT: int = 500 Zeichen-Budget für vollständige Datenwerte im Hierarchy Context - OVERLAP_MAX_CHARS: int = 1000 Maximale Zeichen für den Overlap Context Verwendung: >>> from modules.shared.jsonContinuation import getContexts >>> jsonStr = '{"users": [{"name": "John", "bio": "Hello Wor' >>> contexts = getContexts(jsonStr) >>> print(contexts.overlapContext) >>> print(contexts.hierarchyContext) >>> print(contexts.completePart) Autor: Claude Version: 2.0 """ from typing import Tuple, List, Optional, Any from dataclasses import dataclass from enum import Enum from modules.datamodels.datamodelAi import JsonContinuationContexts # ============================================================================= # MODULE CONSTANTS # ============================================================================= BUDGET_LIMIT: int = 500 """Zeichen-Budget für vollständige Datenwerte im Hierarchy Context""" OVERLAP_MAX_CHARS: int = 1000 """Maximale Zeichen für den Overlap Context""" # ============================================================================= # TOKEN TYPES AND DATA CLASSES # ============================================================================= class TokenType(Enum): """JSON Token Types""" OBJECT_START = "{" OBJECT_END = "}" ARRAY_START = "[" ARRAY_END = "]" STRING = "string" NUMBER = "number" BOOLEAN = "boolean" NULL = "null" COLON = ":" COMMA = "," KEY = "key" EOF = "eof" TRUNCATED = "truncated" @dataclass class Token: """Represents a JSON token with position info""" type: TokenType value: Any start_pos: int end_pos: int raw: str # Original string representation @dataclass class StackFrame: """Represents a level in the JSON hierarchy""" type: str # "object" or "array" start_pos: int key: Optional[str] = None # Current key for objects index: int = 0 # Current index for arrays content: str = "" # Accumulated content for this frame keys_seen: List[str] = None # Keys seen in this object def __post_init__(self): if self.keys_seen is None: self.keys_seen = [] class JsonTokenizer: """Tokenizer for potentially truncated JSON strings""" def __init__(self, jsonStr: str): self.jsonStr = jsonStr self.pos = 0 self.length = len(jsonStr) def skipWhitespace(self): """Skip whitespace characters""" while self.pos < self.length and self.jsonStr[self.pos] in ' \t\n\r': self.pos += 1 def peek(self) -> Optional[str]: """Peek at current character without consuming""" if self.pos < self.length: return self.jsonStr[self.pos] return None def readString(self) -> Token: """Read a JSON string token""" start_pos = self.pos self.pos += 1 # Skip opening quote escaped = False while self.pos < self.length: char = self.jsonStr[self.pos] if escaped: escaped = False self.pos += 1 elif char == '\\': escaped = True self.pos += 1 elif char == '"': self.pos += 1 raw = self.jsonStr[start_pos:self.pos] try: # Try to decode the string value value = raw[1:-1] # Remove quotes for value except: value = raw return Token(TokenType.STRING, value, start_pos, self.pos, raw) else: self.pos += 1 # String was truncated raw = self.jsonStr[start_pos:self.pos] return Token(TokenType.TRUNCATED, raw[1:] if len(raw) > 1 else "", start_pos, self.pos, raw) def readNumber(self) -> Token: """Read a JSON number token""" start_pos = self.pos # Handle negative if self.pos < self.length and self.jsonStr[self.pos] == '-': self.pos += 1 # Read digits while self.pos < self.length and self.jsonStr[self.pos].isdigit(): self.pos += 1 # Decimal part if self.pos < self.length and self.jsonStr[self.pos] == '.': self.pos += 1 while self.pos < self.length and self.jsonStr[self.pos].isdigit(): self.pos += 1 # Exponent if self.pos < self.length and self.jsonStr[self.pos] in 'eE': self.pos += 1 if self.pos < self.length and self.jsonStr[self.pos] in '+-': self.pos += 1 while self.pos < self.length and self.jsonStr[self.pos].isdigit(): self.pos += 1 raw = self.jsonStr[start_pos:self.pos] try: value = float(raw) if '.' in raw or 'e' in raw.lower() else int(raw) except ValueError: value = raw return Token(TokenType.NUMBER, value, start_pos, self.pos, raw) def readKeyword(self) -> Token: """Read true, false, or null""" start_pos = self.pos for keyword, token_type in [('true', TokenType.BOOLEAN), ('false', TokenType.BOOLEAN), ('null', TokenType.NULL)]: if self.jsonStr[self.pos:].startswith(keyword): self.pos += len(keyword) value = True if keyword == 'true' else (False if keyword == 'false' else None) return Token(token_type, value, start_pos, self.pos, keyword) # Partial keyword (truncated) while self.pos < self.length and self.jsonStr[self.pos].isalpha(): self.pos += 1 raw = self.jsonStr[start_pos:self.pos] return Token(TokenType.TRUNCATED, raw, start_pos, self.pos, raw) def nextToken(self) -> Token: """Get the next token""" self.skipWhitespace() if self.pos >= self.length: return Token(TokenType.EOF, None, self.pos, self.pos, "") char = self.jsonStr[self.pos] startPos = self.pos if char == '{': self.pos += 1 return Token(TokenType.OBJECT_START, '{', startPos, self.pos, '{') elif char == '}': self.pos += 1 return Token(TokenType.OBJECT_END, '}', startPos, self.pos, '}') elif char == '[': self.pos += 1 return Token(TokenType.ARRAY_START, '[', startPos, self.pos, '[') elif char == ']': self.pos += 1 return Token(TokenType.ARRAY_END, ']', startPos, self.pos, ']') elif char == ':': self.pos += 1 return Token(TokenType.COLON, ':', startPos, self.pos, ':') elif char == ',': self.pos += 1 return Token(TokenType.COMMA, ',', startPos, self.pos, ',') elif char == '"': return self.readString() elif char == '-' or char.isdigit(): return self.readNumber() elif char.isalpha(): return self.readKeyword() else: # Unknown character, treat as truncated self.pos += 1 return Token(TokenType.TRUNCATED, char, startPos, self.pos, char) @dataclass class HierarchyLevel: """Represents one level in the parsed hierarchy""" type: str # "object" or "array" start_pos: int end_pos: int # -1 if not closed key: Optional[str] # Key if this is a value in an object index: Optional[int] # Index if this is in an array content: dict # Parsed content at this level raw_start: str # Raw string from start to children children_content: List[Any] # For arrays: list of parsed elements def getJsonContinuationContext( truncatedJson: str, budgetLimit: Optional[int] = None, overlapMaxChars: Optional[int] = None ) -> Tuple[str, str, str]: """ Generate continuation contexts for a truncated JSON string. Generiert drei Kontexte für abgeschnittene JSON-Strings: 1. Overlap Context: Das innerste Objekt/Array-Element, das den Cut-Punkt enthält 2. Hierarchy Context: Die hierarchische Struktur vom Root bis zum Cut mit Budget-Logik 3. Complete Part: Der vollständige Teil des JSONs mit allen Strukturen geschlossen Args: truncatedJson: The truncated JSON string budgetLimit: Character budget for data values in hierarchy context (uses BUDGET_LIMIT if None) overlapMaxChars: Maximum characters for overlap context (uses OVERLAP_MAX_CHARS if None) Returns: Tuple of (overlapContext, hierarchyContext, completePart): - overlapContext: The innermost object/element containing the cut (for merging) - hierarchyContext: Full structure from root to cut with budget-limited values - completePart: Valid JSON with all structures properly closed """ if budgetLimit is None: budgetLimit = BUDGET_LIMIT if overlapMaxChars is None: overlapMaxChars = OVERLAP_MAX_CHARS analyzer = JsonAnalyzer(truncatedJson, budgetLimit, overlapMaxChars) return analyzer.analyze() class JsonAnalyzer: """ Analyzes truncated JSON and generates continuation contexts. Generates three contexts for truncated JSON strings: 1. Overlap Context: The innermost object/array element containing the cut point 2. Hierarchy Context: The hierarchical structure from root to cut with budget logic 3. Complete Part: The complete part of the JSON with all structures properly closed """ def __init__(self, jsonStr: str, budgetLimit: Optional[int] = None, overlapMaxChars: Optional[int] = None): self.jsonStr = jsonStr self.budgetLimit = budgetLimit if budgetLimit is not None else BUDGET_LIMIT self.overlapMaxChars = overlapMaxChars if overlapMaxChars is not None else OVERLAP_MAX_CHARS self.stack: List[StackFrame] = [] self.hierarchy: List[dict] = [] # Parsed hierarchy info def analyze(self) -> Tuple[str, str, str]: """ Analyze the truncated JSON and return all three contexts. Returns: Tuple of (overlapContext, hierarchyContext, completePart) """ # Parse and track the structure self._parseStructure() # Generate overlap context overlapContext = self._generateOverlapContext() # Generate hierarchy context (use improved version) hierarchyContext = self._renderWithBudgetV2() # Generate complete part (JSON with all structures closed) completePart = self._generateCompletePart() return overlapContext, hierarchyContext, completePart def _generateCompletePart(self) -> str: """ Generate the complete part of the JSON with all structures properly closed. This creates valid JSON by closing all open strings, brackets/braces. Unvollständige Keys werden entfernt, damit das Ergebnis valides JSON ist. Unvollständige Keywords (true, false, null) werden vervollständigt. Strategy: 1. Take the full truncated JSON 2. If we're in the middle of a string, close it 3. Complete incomplete keywords (tr → true, f → false, n → null) 4. Remove incomplete key-value pairs (keys without values) 5. Close all open brackets/braces """ result = self.jsonStr.rstrip() # Remove trailing comma if present (after stripping) if result.endswith(','): result = result[:-1] # Check if we need to close an open string stringClosing = self._getStringClosing(result) result += stringClosing # Complete incomplete keywords (true, false, null) result = self._completeIncompleteKeywords(result) # Check if we're in the middle of a key (after colon) # If string was just closed and we're after a colon with no value, remove the key result = self._cleanIncompleteKeyValue(result) # Close all open structures closingBrackets = self._getClosingBrackets(result) return result + closingBrackets def _getStringClosing(self, jsonStr: str) -> str: """Check if there's an unclosed string and return closing quote if needed.""" in_string = False escaped = False for char in jsonStr: if escaped: escaped = False continue if char == '\\' and in_string: escaped = True continue if char == '"': in_string = not in_string return '"' if in_string else "" def _cleanIncompleteKeyValue(self, jsonStr: str) -> str: """ Clean up incomplete key-value pairs. Handles cases like: - {"key": "incompl -> keep (valid truncated value) - {"key": -> remove key - {"a": 1, "key -> remove incomplete key (was in middle of key name) """ stripped = jsonStr.rstrip() # Pattern: ends with colon (possibly with whitespace) - incomplete value if stripped.endswith(':'): # Find the start of this key and remove the whole key-value return self._removeLastKey(stripped) # Check if we just closed a string that was an incomplete key # Pattern: ..., "something" or { "something" where something has no colon after # This happens when we close a truncated key name like "add" -> "add" if stripped.endswith('"'): # Look for the pattern: comma/bracket + whitespace + "string" # and check if this was supposed to be a key if self._isIncompleteKey(stripped): return self._removeLastKey(stripped) return jsonStr def _completeIncompleteKeywords(self, jsonStr: str) -> str: """ Complete incomplete JSON keywords at the end of the string. Checks the last element for incomplete keywords after colon: - ": t*" or ": f*" or ": n*" -> complete to true/false/null - ": " or ":" (without keyword) -> set to null """ result = jsonStr.rstrip() # Find the last colon (not in string) in_string = False escaped = False last_colon_pos = -1 for i in range(len(result) - 1, -1, -1): char = result[i] if escaped: escaped = False continue if char == '\\' and in_string: escaped = True continue if char == '"': in_string = not in_string continue if not in_string and char == ':': last_colon_pos = i break if last_colon_pos < 0: return result # Get text after the last colon after_colon = result[last_colon_pos + 1:].strip() # Check for incomplete keyword patterns if after_colon.startswith('t') or after_colon.startswith('T'): # Incomplete true keyword_start = last_colon_pos + 1 # Skip whitespace while keyword_start < len(result) and result[keyword_start] in ' \t\n\r': keyword_start += 1 # Remove partial keyword keyword_end = keyword_start + 1 while keyword_end < len(result) and result[keyword_end].isalpha(): keyword_end += 1 return result[:keyword_start] + 'true' + result[keyword_end:] elif after_colon.startswith('f') or after_colon.startswith('F'): # Incomplete false keyword_start = last_colon_pos + 1 while keyword_start < len(result) and result[keyword_start] in ' \t\n\r': keyword_start += 1 keyword_end = keyword_start + 1 while keyword_end < len(result) and result[keyword_end].isalpha(): keyword_end += 1 return result[:keyword_start] + 'false' + result[keyword_end:] elif after_colon.startswith('n') or after_colon.startswith('N'): # Incomplete null keyword_start = last_colon_pos + 1 while keyword_start < len(result) and result[keyword_start] in ' \t\n\r': keyword_start += 1 keyword_end = keyword_start + 1 while keyword_end < len(result) and result[keyword_end].isalpha(): keyword_end += 1 return result[:keyword_start] + 'null' + result[keyword_end:] elif not after_colon or after_colon == '': # No keyword after colon -> set to null return result + 'null' return result def _isIncompleteKey(self, jsonStr: str) -> bool: """ Check if the last string in the JSON is an incomplete key in an object. This happens when truncation occurred in the middle of a key name. Only applies to objects, not arrays. """ # Find the last complete string pos = len(jsonStr) - 1 if jsonStr[pos] != '"': return False # Find the opening quote of this string stringStart = pos - 1 while stringStart >= 0: if jsonStr[stringStart] == '"': # Check it's not escaped numBackslashes = 0 checkPos = stringStart - 1 while checkPos >= 0 and jsonStr[checkPos] == '\\': numBackslashes += 1 checkPos -= 1 if numBackslashes % 2 == 0: break stringStart -= 1 if stringStart < 0: return False # Now stringStart points to opening quote # Check what's before it (skip whitespace) beforePos = stringStart - 1 while beforePos >= 0 and jsonStr[beforePos] in ' \t\n\r': beforePos -= 1 if beforePos < 0: return False # For this to be an incomplete key, it must be preceded by { or , # AND we must be inside an object (not an array) if jsonStr[beforePos] not in ',{': return False # Now check if we're in an object context (not array) # Count open braces/brackets to determine context braceCount = 0 bracketCount = 0 inString = False for i in range(beforePos + 1): char = jsonStr[i] if char == '"' and (i == 0 or jsonStr[i-1] != '\\'): inString = not inString elif not inString: if char == '{': braceCount += 1 elif char == '}': braceCount -= 1 elif char == '[': bracketCount += 1 elif char == ']': bracketCount -= 1 # If we have more open braces than brackets at this point, # we're in an object context # Actually, we need to check the innermost container # Let's track the stack properly stack = [] inString = False for i in range(beforePos + 1): char = jsonStr[i] if char == '"' and (i == 0 or jsonStr[i-1] != '\\'): inString = not inString elif not inString: if char == '{': stack.append('object') elif char == '[': stack.append('array') elif char == '}': if stack and stack[-1] == 'object': stack.pop() elif char == ']': if stack and stack[-1] == 'array': stack.pop() # If innermost container is an object, this is an incomplete key return len(stack) > 0 and stack[-1] == 'object' def _removeLastKey(self, jsonStr: str) -> str: """Remove the last incomplete key-value pair from the JSON string.""" stripped = jsonStr.rstrip() # Find the last comma or opening bracket before the incomplete key pos = len(stripped) - 1 # Skip past the current string/key in_string = False while pos >= 0: char = stripped[pos] if char == '"' and (pos == 0 or stripped[pos-1] != '\\'): in_string = not in_string if not in_string and char in ',{': break pos -= 1 if pos < 0: return stripped if stripped[pos] == ',': # Remove from comma onwards return stripped[:pos] elif stripped[pos] == '{': # Keep the opening brace return stripped[:pos+1] return stripped def _findLastCompletePosition(self) -> int: """Find the position of the last complete value in the JSON.""" tokenizer = JsonTokenizer(self.jsonStr) last_complete_pos = 0 stack_depth = 0 last_value_end = 0 in_value = False while True: token = tokenizer.nextToken() if token.type == TokenType.EOF: break if token.type == TokenType.TRUNCATED: # Return position before the truncated part break if token.type in (TokenType.OBJECT_START, TokenType.ARRAY_START): stack_depth += 1 in_value = True elif token.type in (TokenType.OBJECT_END, TokenType.ARRAY_END): stack_depth -= 1 last_value_end = token.end_pos in_value = False elif token.type == TokenType.STRING: # Check if this is a key or a value saved_pos = tokenizer.pos tokenizer.skipWhitespace() next_char = tokenizer.peek() tokenizer.pos = saved_pos if next_char != ':': # It's a value last_value_end = token.end_pos in_value = False elif token.type in (TokenType.NUMBER, TokenType.BOOLEAN, TokenType.NULL): last_value_end = token.end_pos in_value = False elif token.type == TokenType.COMMA: # After a comma, we've completed a value last_complete_pos = last_value_end # Return the last complete position return last_value_end if last_value_end > 0 else len(self.jsonStr) def _getClosingBrackets(self, jsonStr: str) -> str: """Determine what closing brackets are needed.""" stack = [] in_string = False escaped = False for char in jsonStr: if escaped: escaped = False continue if char == '\\' and in_string: escaped = True continue if char == '"': in_string = not in_string continue if in_string: continue if char == '{': stack.append('}') elif char == '[': stack.append(']') elif char == '}': if stack and stack[-1] == '}': stack.pop() elif char == ']': if stack and stack[-1] == ']': stack.pop() # Return closing brackets in reverse order return ''.join(reversed(stack)) def _parseStructure(self): """Parse the JSON structure and track hierarchy""" tokenizer = JsonTokenizer(self.jsonStr) while True: token = tokenizer.nextToken() if token.type == TokenType.EOF or token.type == TokenType.TRUNCATED: break if token.type == TokenType.OBJECT_START: frame = StackFrame( type="object", start_pos=token.start_pos, keys_seen=[] ) self.stack.append(frame) elif token.type == TokenType.ARRAY_START: frame = StackFrame( type="array", start_pos=token.start_pos, index=0 ) self.stack.append(frame) elif token.type == TokenType.OBJECT_END: if self.stack and self.stack[-1].type == "object": self.stack.pop() elif token.type == TokenType.ARRAY_END: if self.stack and self.stack[-1].type == "array": self.stack.pop() elif token.type == TokenType.STRING: # Could be a key or a value self._handleStringToken(token, tokenizer) elif token.type == TokenType.COMMA: # Increment array index if self.stack and self.stack[-1].type == "array": self.stack[-1].index += 1 def _handleStringToken(self, token: Token, tokenizer: JsonTokenizer): """Handle a string token (could be key or value)""" if self.stack and self.stack[-1].type == "object": # Check if this is a key (followed by colon) saved_pos = tokenizer.pos tokenizer.skipWhitespace() next_char = tokenizer.peek() if next_char == ':': # This is a key self.stack[-1].key = token.value self.stack[-1].keys_seen.append(token.value) tokenizer.pos = saved_pos def _generateOverlapContext(self) -> str: """ Generate the overlap context - the innermost object/array element containing the cut. Returns the raw string from the start of that element to the end of the truncated JSON. Dieser Kontext wird verwendet, um den abgeschnittenen Teil mit dem neuen Teil zu mergen. Exakt so wie im Original-String (für String-Matching beim Merge). SPECIAL CASE: If cut point is within a list item, return only: - The broken list item (containing the cut) - The list item before it (if available) This avoids returning the entire list when only a single item is broken. """ if not self.stack: # No structure, return last overlap_max_chars characters return self.jsonStr[-self.overlapMaxChars:] # Find the innermost container that should be the overlap innermost = self.stack[-1] # SPECIAL CASE: If innermost is an array and cut is within a list item, # return only the broken item and the previous item (if available) if innermost.type == "array": overlap_start = self._findOverlapStartForArray(innermost) else: # For objects, use the standard logic overlap_start = self._findInnermostElementStart() overlap = self.jsonStr[overlap_start:] # Apply max chars limit if len(overlap) > self.overlapMaxChars: overlap = self.jsonStr[-self.overlapMaxChars:] return overlap def _findOverlapStartForArray(self, arrayFrame: StackFrame) -> int: """ Find overlap start for array: return only the broken list item and previous item. If cut point is within a list item, returns start of previous item (if available), otherwise returns start of current (broken) item. """ # Find all element start positions in the array element_starts = self._findAllArrayElementStarts(arrayFrame) if not element_starts: # No elements found, fall back to array start return arrayFrame.start_pos current_index = arrayFrame.index # If we're at index 0, there's no previous item - return current item start if current_index == 0: return element_starts[0] # If current_index is beyond known elements, use last known element if current_index >= len(element_starts): # Return start of second-to-last element (previous to last) if len(element_starts) >= 2: return element_starts[-2] # Previous item else: return element_starts[0] # Only one item, return it # Return start of previous item (current_index - 1) return element_starts[current_index - 1] def _findAllArrayElementStarts(self, arrayFrame: StackFrame) -> List[int]: """Find all element start positions in an array""" arrayContent = self.jsonStr[arrayFrame.start_pos:] # Skip the opening bracket and whitespace pos = 1 while pos < len(arrayContent) and arrayContent[pos] in ' \t\n\r': pos += 1 elementStarts = [arrayFrame.start_pos + pos] depth = 0 inString = False escaped = False i = pos while i < len(arrayContent): char = arrayContent[i] if escaped: escaped = False i += 1 continue if char == '\\' and inString: escaped = True i += 1 continue if char == '"': inString = not inString i += 1 continue if inString: i += 1 continue if char in '{[': depth += 1 elif char in '}]': depth -= 1 elif char == ',' and depth == 0: # Found element boundary i += 1 # Skip whitespace while i < len(arrayContent) and arrayContent[i] in ' \t\n\r': i += 1 elementStarts.append(arrayFrame.start_pos + i) i += 1 return elementStarts def _findInnermostElementStart(self) -> int: """Find the start position of the innermost element for overlap""" if not self.stack: return max(0, len(self.jsonStr) - self.overlapMaxChars) # Walk through stack to find the innermost array element or object # We want the innermost "atomic" unit that contains the cut # Strategy: # - If innermost is an object: return its start # - If innermost is an array: # - If current element is an object/array: return start of that element # - If current element is a primitive: return start of array or last N chars innermost = self.stack[-1] if innermost.type == "object": return innermost.start_pos else: # It's an array - find the start of the current element element_start = self._findArrayElementStart(innermost) # Check if the element is a primitive or complex type element_content = self.jsonStr[element_start:].strip() # If it starts with { or [ it's complex, return the element start if element_content and element_content[0] in '{[': return element_start else: # Primitive in array - check if there's a parent object # or return overlap_max_chars from end for i in range(len(self.stack) - 2, -1, -1): if self.stack[i].type == "object": return self.stack[i].start_pos # No parent object, return max chars from end return max(0, len(self.jsonStr) - self.overlapMaxChars) def _findArrayElementStart(self, arrayFrame: StackFrame) -> int: """Find the start position of the current array element""" # We need to find the start of the current element in the array # Parse from array start to find element boundaries arrayContent = self.jsonStr[arrayFrame.start_pos:] # Skip the opening bracket and whitespace pos = 1 while pos < len(arrayContent) and arrayContent[pos] in ' \t\n\r': pos += 1 elementStarts = [arrayFrame.start_pos + pos] depth = 0 inString = False escaped = False i = pos while i < len(arrayContent): char = arrayContent[i] if escaped: escaped = False i += 1 continue if char == '\\' and inString: escaped = True i += 1 continue if char == '"': inString = not inString i += 1 continue if inString: i += 1 continue if char in '{[': depth += 1 elif char in '}]': depth -= 1 elif char == ',' and depth == 0: # Found element boundary i += 1 # Skip whitespace while i < len(arrayContent) and arrayContent[i] in ' \t\n\r': i += 1 elementStarts.append(arrayFrame.start_pos + i) i += 1 # Return the start of the current element if arrayFrame.index < len(elementStarts): return elementStarts[arrayFrame.index] elif elementStarts: return elementStarts[-1] else: return arrayFrame.start_pos def _generateHierarchyContext(self) -> str: """ Generate the hierarchy context with budget logic. Shows structure from root to cut point with data values limited by budget. """ if not self.stack: # No structure return self.jsonStr[-self.overlapMaxChars:] # We need to rebuild the JSON with budget logic # Priority: elements closer to cut get full values, distant ones get "..." return self._rebuildWithBudget() def _rebuildWithBudget(self) -> str: """Rebuild JSON from root to cut with budget constraints""" # Strategy: # 1. Parse the JSON structure tracking all values # 2. Calculate total value size # 3. Apply budget from cut backwards # 4. Render with "..." for values outside budget # First, get a structured representation structure = self._parseForHierarchy() # Now render with budget return self._renderWithBudget(structure) def _parseForHierarchy(self) -> dict: """Parse JSON into a structure suitable for hierarchy rendering""" result = { 'type': 'root', 'children': [], 'raw_positions': [] } tokenizer = JsonTokenizer(self.jsonStr) stack = [result] current_key = None while True: token = tokenizer.nextToken() if token.type == TokenType.EOF: break if token.type == TokenType.TRUNCATED: # Mark the truncation point if stack: current = stack[-1] if current.get('type') == 'object': if current_key: current['children'].append({ 'type': 'truncated_value', 'key': current_key, 'raw': self.jsonStr[token.start_pos:], 'start_pos': token.start_pos }) elif current.get('type') == 'array': current['children'].append({ 'type': 'truncated_value', 'raw': self.jsonStr[token.start_pos:], 'start_pos': token.start_pos }) break if token.type == TokenType.OBJECT_START: obj = { 'type': 'object', 'key': current_key, 'children': [], 'start_pos': token.start_pos } if stack: stack[-1]['children'].append(obj) stack.append(obj) current_key = None elif token.type == TokenType.ARRAY_START: arr = { 'type': 'array', 'key': current_key, 'children': [], 'start_pos': token.start_pos } if stack: stack[-1]['children'].append(arr) stack.append(arr) current_key = None elif token.type == TokenType.OBJECT_END: if len(stack) > 1 and stack[-1].get('type') == 'object': stack[-1]['end_pos'] = token.end_pos stack[-1]['complete'] = True stack.pop() elif token.type == TokenType.ARRAY_END: if len(stack) > 1 and stack[-1].get('type') == 'array': stack[-1]['end_pos'] = token.end_pos stack[-1]['complete'] = True stack.pop() elif token.type == TokenType.STRING: # Check if it's a key saved_pos = tokenizer.pos tokenizer.skipWhitespace() next_char = tokenizer.peek() if next_char == ':' and stack and stack[-1].get('type') == 'object': current_key = token.value else: # It's a value value_node = { 'type': 'value', 'key': current_key, 'value': token.value, 'raw': token.raw, 'start_pos': token.start_pos, 'end_pos': token.end_pos, 'value_type': 'string' } if stack: stack[-1]['children'].append(value_node) current_key = None tokenizer.pos = saved_pos elif token.type in (TokenType.NUMBER, TokenType.BOOLEAN, TokenType.NULL): value_node = { 'type': 'value', 'key': current_key, 'value': token.value, 'raw': token.raw, 'start_pos': token.start_pos, 'end_pos': token.end_pos, 'value_type': str(token.type.value) } if stack: stack[-1]['children'].append(value_node) current_key = None return result def _renderWithBudget(self, structure: dict) -> str: """Render the structure with budget constraints""" # First, collect all value nodes with their distances from cut cutPos = len(self.jsonStr) allValues = self._collectValuesWithDistance(structure, cutPos) # Sort by distance (closest to cut first) allValues.sort(key=lambda x: x['distance']) # Determine which values get full rendering budgetRemaining = self.budgetLimit valuesWithBudget = set() for valInfo in allValues: valSize = len(str(valInfo['raw'])) if budgetRemaining >= valSize: valuesWithBudget.add(valInfo['id']) budgetRemaining -= valSize # Now render the structure return self._renderNode(structure, valuesWithBudget, indent=0) def _collectValuesWithDistance(self, node: dict, cutPos: int, depth: int = 0) -> list: """Collect all value nodes with their distance from cut point""" values = [] if node.get('type') == 'value': endPos = node.get('end_pos', cutPos) distance = cutPos - endPos values.append({ 'id': id(node), 'node': node, 'distance': distance, 'raw': node.get('raw', ''), 'depth': depth }) elif node.get('type') == 'truncated_value': values.append({ 'id': id(node), 'node': node, 'distance': 0, # Truncated values are at the cut 'raw': node.get('raw', ''), 'depth': depth }) for child in node.get('children', []): values.extend(self._collectValuesWithDistance(child, cutPos, depth + 1)) return values def _renderNode(self, node: dict, valuesWithBudget: set, indent: int = 0) -> str: """Render a node with budget constraints""" indent_str = " " * indent node_type = node.get('type') if node_type == 'root': parts = [] for child in node.get('children', []): parts.append(self._renderNode(child, valuesWithBudget, indent)) return '\n'.join(parts) elif node_type == 'object': return self._renderObject(node, valuesWithBudget, indent) elif node_type == 'array': return self._renderArray(node, valuesWithBudget, indent) elif node_type == 'value': return self._renderValue(node, valuesWithBudget, indent) elif node_type == 'truncated_value': return node.get('raw', '') return '' def _renderObject(self, node: dict, valuesWithBudget: set, indent: int) -> str: """Render an object node""" indent_str = " " * indent inner_indent = " " * (indent + 1) key_prefix = "" if node.get('key'): key_prefix = f'"{node["key"]}": ' if not node.get('children'): if node.get('complete'): return f"{key_prefix}{{}}" else: return f"{key_prefix}{{" parts = [f"{key_prefix}{{"] children = node.get('children', []) for i, child in enumerate(children): child_rendered = self._renderNode(child, valuesWithBudget, indent + 1) # Add comma if not last and next sibling exists if i < len(children) - 1: if child.get('type') != 'truncated_value': parts.append(f"{inner_indent}{child_rendered},") else: parts.append(f"{inner_indent}{child_rendered}") else: parts.append(f"{inner_indent}{child_rendered}") if node.get('complete'): parts.append(f"{indent_str}}}") return '\n'.join(parts) def _renderArray(self, node: dict, valuesWithBudget: set, indent: int) -> str: """Render an array node""" indent_str = " " * indent inner_indent = " " * (indent + 1) key_prefix = "" if node.get('key'): key_prefix = f'"{node["key"]}": ' if not node.get('children'): if node.get('complete'): return f"{key_prefix}[]" else: return f"{key_prefix}[" parts = [f"{key_prefix}["] children = node.get('children', []) for i, child in enumerate(children): child_rendered = self._renderNode(child, valuesWithBudget, indent + 1) if i < len(children) - 1: if child.get('type') != 'truncated_value': parts.append(f"{inner_indent}{child_rendered},") else: parts.append(f"{inner_indent}{child_rendered}") else: parts.append(f"{inner_indent}{child_rendered}") if node.get('complete'): parts.append(f"{indent_str}]") return '\n'.join(parts) def _renderValue(self, node: dict, valuesWithBudget: set, indent: int) -> str: """Render a value node""" key_prefix = "" if node.get('key'): key_prefix = f'"{node["key"]}": ' if id(node) in valuesWithBudget: # Full value default_raw = '"...\"' raw_value = node.get('raw', default_raw) return f"{key_prefix}{raw_value}" else: # Placeholder return f'{key_prefix}"..."' def _renderWithBudgetV2(self) -> str: """ Generate hierarchy context with budget logic. Alternative rendering that stays closer to the original truncated string. Shows full context near the cut, replaces distant values with "...". Budget-Logik: 1. Sammeln: Alle String-Werte werden mit ihrer Position gesammelt 2. Sortieren: Nach Entfernung zum Cut-Punkt (näher = höhere Priorität) 3. Zuweisen: Budget wird von hinten nach vorne aufgebraucht 4. Ersetzen: Werte außerhalb des Budgets werden durch "..." ersetzt """ # Parse to understand structure, but render from original string with modifications structure = self._parseForHierarchy() # Collect all complete value nodes with positions allValues = self._collectCompleteValues(structure) # Sort by end position (furthest from cut = first to be truncated) allValues.sort(key=lambda x: x['end_pos']) # Apply budget: replace values from the start until budget exhausted budgetUsed = 0 totalAvailable = sum(len(v['raw']) for v in allValues) valuesToReplace = [] for val in allValues: valSize = len(val['raw']) if totalAvailable - budgetUsed > self.budgetLimit: # This value should be replaced with "..." valuesToReplace.append(val) budgetUsed += valSize else: break # Build the modified string result = self.jsonStr # Replace from end to start to preserve positions valuesToReplace.sort(key=lambda x: x['start_pos'], reverse=True) for val in valuesToReplace: start = val['start_pos'] end = val['end_pos'] result = result[:start] + '"..."' + result[end:] return result def _collectCompleteValues(self, node: dict) -> list: """Collect all complete (non-truncated) value nodes""" values = [] if node.get('type') == 'value' and node.get('value_type') == 'string': values.append({ 'start_pos': node['start_pos'], 'end_pos': node['end_pos'], 'raw': node['raw'], 'key': node.get('key') }) for child in node.get('children', []): values.extend(self._collectCompleteValues(child)) return values def extractContinuationContexts( truncatedJson: str ) -> Tuple[str, str, str]: """ Main entry point: Extract all three continuation contexts from a truncated JSON. Generiert drei Kontexte für abgeschnittene JSON-Strings: 1. Overlap Context: Das innerste Objekt/Array-Element, das den Cut-Punkt enthält - Wird verwendet, um den abgeschnittenen Teil mit dem neuen Teil zu mergen - Exakt so wie im Original-String (für String-Matching beim Merge) 2. Hierarchy Context: Die hierarchische Struktur vom Root bis zum Cut-Punkt - Mit Budget-Logik: Näher am Cut = vollständige Werte, weiter weg = "..." Platzhalter - Gibt der AI den Kontext der gesamten JSON-Struktur 3. Complete Part: Der vollständige, valide JSON bis zum Cut-Punkt - Alle offenen Strukturen werden geschlossen (}, ], ") - Unvollständige Keys werden entfernt - Kann direkt als valides JSON geparst werden Uses module constants BUDGET_LIMIT and OVERLAP_MAX_CHARS. Args: truncatedJson: The truncated JSON string Returns: Tuple of (overlapContext, hierarchyContext, completePart): - overlapContext: The innermost object/element containing the cut (for merging) - hierarchyContext: Full structure from root to cut with budget-limited values - completePart: Valid JSON with all structures properly closed Example: >>> jsonStr = '{"users": [{"name": "John", "bio": "Hello Wor' >>> overlap, hierarchy, complete = extractContinuationContexts(jsonStr) >>> import json >>> parsed = json.loads(complete) # ✓ Funktioniert! """ return getJsonContinuationContext(truncatedJson) # Convenience function with named results def getContexts( truncatedJson: str ) -> JsonContinuationContexts: """ Get all contexts as a Pydantic model with named fields. Uses module constants BUDGET_LIMIT and OVERLAP_MAX_CHARS. Args: truncatedJson: The truncated JSON string Returns: JsonContinuationContexts Pydantic model with: - overlapContext: The innermost object/element containing the cut - hierarchyContext: Full structure with budget-limited values - completePart: Valid JSON with all structures properly closed Example: >>> json_str = '{"users": [{"name": "John", "bio": "Hello Wor' >>> contexts = getContexts(json_str) >>> print(contexts.overlapContext) >>> print(contexts.hierarchyContext) >>> print(contexts.completePart) """ overlap, hierarchy, completePart = extractContinuationContexts(truncatedJson) return JsonContinuationContexts( overlapContext=overlap, hierarchyContext=hierarchy, completePart=completePart )