gateway/modules/shared/jsonUtils.py
ValueOn AG 64590aa61e fixes
2026-01-04 20:01:34 +01:00

2631 lines
110 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
import json
import logging
import re
from typing import Any, Dict, List, Optional, Tuple, Union, Type, TypeVar
from pydantic import BaseModel, ValidationError
logger = logging.getLogger(__name__)
T = TypeVar('T', bound=BaseModel)
def stripCodeFences(text: str) -> str:
"""Remove ```json / ``` fences and surrounding whitespace if present.
Also removes [SOURCE: ...] and [END SOURCE] tags that may wrap the JSON."""
if not text:
return text
s = text.strip()
# Remove [SOURCE: ...] tags at the beginning
if s.startswith("[SOURCE:"):
# Find the end of the SOURCE tag (newline or end of string)
end_pos = s.find("\n")
if end_pos != -1:
s = s[end_pos+1:]
else:
# No newline, entire string is SOURCE tag
return ""
# Remove [END SOURCE] tags at the end
if s.endswith("[END SOURCE]"):
# Find the start of END SOURCE tag (newline before it)
start_pos = s.rfind("\n[END SOURCE]")
if start_pos != -1:
s = s[:start_pos]
else:
# No newline, entire string is END SOURCE tag
return ""
# Handle opening fence (may or may not have closing fence)
if s.startswith("```"):
# Remove first triple backticks
# Commonly starts with ```json\n
i = 3
# Skip optional language tag like 'json'
while i < len(s) and s[i] != '\n':
i += 1
if i < len(s) and s[i] == '\n':
s = s[i+1:]
# Strip trailing ``` if present
if s.endswith("```"):
s = s[:-3]
return s.strip()
return s
def extractFirstBalancedJson(text: str) -> str:
"""Return the first balanced JSON object/array substring; otherwise return trimmed input."""
if not text:
return text
s = text.strip()
# Find first '{' or '['
brace = s.find('{')
bracket = s.find('[')
start = -1
if brace != -1 and (bracket == -1 or brace < bracket):
start = brace
elif bracket != -1:
start = bracket
if start == -1:
return s
# Scan for matching close using a simple stack
stack: List[str] = []
for i in range(start, len(s)):
ch = s[i]
if ch in '{[':
stack.append(ch)
elif ch in '}]':
if not stack:
continue
opener = stack.pop()
if (opener == '{' and ch != '}') or (opener == '[' and ch != ']'):
continue
if not stack:
return s[start:i+1].strip()
return s
def normalizeJsonText(text: str) -> str:
"""Light normalization: remove BOM, normalize smart quotes."""
if not text:
return text
s = text
# Remove UTF-8 BOM if present
if s.startswith('\ufeff'):
s = s.lstrip('\ufeff')
# Normalize smart quotes to straight quotes
s = s.replace('', '"').replace('', '"').replace('', "'").replace('', "'")
return s
def extractJsonString(text: str) -> str:
"""Strip code fences, normalize, then extract first balanced JSON substring."""
s = normalizeJsonText(text)
s = stripCodeFences(s)
s = extractFirstBalancedJson(s)
return s.strip()
def tryParseJson(text: Union[str, bytes]) -> Tuple[Optional[Union[Dict, List]], Optional[Exception], str]:
"""Extract and parse JSON; return (obj, error, cleaned_str)."""
if isinstance(text, bytes):
try:
text = text.decode('utf-8', errors='replace')
except Exception:
text = str(text)
cleaned = extractJsonString(text or "")
try:
return json.loads(cleaned), None, cleaned
except Exception as e:
return None, e, cleaned
def repairBrokenJson(text: str) -> Optional[Dict[str, Any]]:
"""
Attempt to repair broken JSON using multiple strategies.
Generic solution that works for any content type.
Returns the best repair attempt or None if all fail.
IMPORTANT: This function tries to preserve ALL data by avoiding truncation.
Only uses truncation as a last resort when structure closing fails.
"""
if not text:
return None
# Strategy 1: Structure closing - close incomplete structures WITHOUT truncating
# This preserves all data and should be tried first
closedStr = closeJsonStructures(text)
obj, err, _ = tryParseJson(closedStr)
if err is None and isinstance(obj, dict):
sections = extractSectionsFromDocument(obj)
if sections:
logger.info(f"Repaired JSON using structure closing (preserved all data, found {len(sections)} sections)")
return obj
else:
# Structure closing worked but no sections found - still return it
logger.info("Repaired JSON using structure closing (preserved all data, but no sections found)")
return obj
# Strategy 2: Try to extract sections from the entire text using regex
# This handles cases where the JSON structure is broken but content is intact
# NOTE: _extractSectionsRegex may truncate, but we try it before progressive parsing
extractedSections = _extractSectionsRegex(text)
if extractedSections:
logger.info(f"Extracted {len(extractedSections)} sections using regex")
return {
"metadata": {
"split_strategy": "single_document",
"source_documents": [],
"extraction_method": "ai_generation"
},
"documents": [{"sections": extractedSections}]
}
# Strategy 3: Progressive parsing - try to find longest valid prefix (TRUNCATES DATA)
# WARNING: This strategy truncates the input and loses data after the truncation point
# Only use as last resort when other strategies fail
logger.warning("Structure closing and regex extraction failed, trying progressive parsing (WILL TRUNCATE DATA)")
bestResult = None
bestValidLength = 0
# Try different step sizes to find the best valid JSON
for stepSize in [100, 50, 10, 1]:
for i in range(len(text), 0, -stepSize):
testStr = text[:i]
closedStr = closeJsonStructures(testStr)
obj, err, _ = tryParseJson(closedStr)
if err is None and isinstance(obj, dict):
bestResult = obj
bestValidLength = i
logger.debug(f"Progressive parsing success at length {i} (step: {stepSize}) - DATA TRUNCATED AT POSITION {i}")
break
if bestResult:
break
if bestResult:
logger.warning(f"Repaired JSON using progressive parsing (valid length: {bestValidLength}, DATA LOST AFTER THIS POINT)")
# Check if we have sections in the result
sections = extractSectionsFromDocument(bestResult)
if sections:
logger.info(f"Progressive parsing found {len(sections)} sections")
return bestResult
else:
# No sections found in progressive parsing, try to extract from broken part
logger.info("Progressive parsing found no sections, trying to extract from broken part")
extractedSections = _extractSectionsRegex(text[bestValidLength:])
if extractedSections:
logger.info(f"Extracted {len(extractedSections)} sections from broken part")
# Merge with the valid part
if "documents" not in bestResult:
bestResult["documents"] = []
if not bestResult["documents"]:
bestResult["documents"] = [{"sections": []}]
bestResult["documents"][0]["sections"].extend(extractedSections)
return bestResult
logger.warning("All repair strategies failed")
return None
def closeJsonStructures(text: str) -> str:
"""
Close incomplete JSON structures generically and correctly.
Generic approach:
1. Close unterminated strings (if odd number of quotes)
2. Track structure opening order with stack (LIFO)
3. Close structures in reverse order (last opened, first closed)
4. Remove trailing commas only directly before closing brackets/braces
"""
if not text:
return text
result = text
# Step 1: Close unterminated strings
# Simple: if odd number of quotes, find last unescaped quote and close it
quoteCount = result.count('"')
if quoteCount % 2 == 1:
# Find last unescaped quote
i = len(result) - 1
while i >= 0:
if result[i] == '"':
# Count backslashes before quote
escapeCount = 0
j = i - 1
while j >= 0 and result[j] == '\\':
escapeCount += 1
j -= 1
# If even number of backslashes, quote is not escaped
if escapeCount % 2 == 0:
result += '"'
break
i -= 1
# Step 2: Track structure opening order with stack
stack = []
inString = False
escapeNext = False
for char in result:
if escapeNext:
escapeNext = False
continue
if char == '\\':
escapeNext = True
continue
if char == '"':
inString = not inString
continue
# Only track braces/brackets outside of strings
if not inString:
if char == '{':
stack.append('}')
elif char == '[':
stack.append(']')
elif char == '}' or char == ']':
# Pop matching closing bracket/brace from stack
if stack and stack[-1] == char:
stack.pop()
# Step 3: Close remaining structures in reverse order (LIFO)
# Remove trailing comma ONLY directly before each closing bracket/brace
while stack:
closingChar = stack.pop()
result = result.rstrip()
# Remove trailing comma if present (invalid before closing)
if result and result[-1] == ',':
result = result[:-1].rstrip()
result += closingChar
return result
def _extractSectionsRegex(text: str) -> List[Dict[str, Any]]:
"""
Extract sections from broken/incomplete JSON using structural parsing.
ROBUST APPROACH: Uses JSON repair and parsing instead of fragile regex patterns.
Works for any content type, nested structures, and incomplete JSON.
NOTE: This function is called FROM repairBrokenJson, so it must NOT call repairBrokenJson
to avoid circular dependency. Instead, it implements its own repair strategies.
IMPORTANT: Tries to preserve data by using structure closing first before truncation.
"""
sections = []
# Strategy 1: Try structure closing WITHOUT truncation first (preserves all data)
closed_str = closeJsonStructures(text)
obj, err, _ = tryParseJson(closed_str)
if err is None and isinstance(obj, dict):
extracted_sections = extractSectionsFromDocument(obj)
if extracted_sections:
logger.debug(f"_extractSectionsRegex: Extracted {len(extracted_sections)} sections using structure closing (preserved all data)")
return extracted_sections
# Strategy 2: Try progressive parsing to find longest valid JSON prefix (TRUNCATES DATA)
# WARNING: This truncates the input and loses data
# Only use if structure closing failed
logger.debug("_extractSectionsRegex: Structure closing failed, trying progressive parsing (WILL TRUNCATE)")
best_result = None
best_valid_length = 0
for step_size in [1000, 500, 100, 50, 10]:
for i in range(len(text), 0, -step_size):
test_str = text[:i]
closed_str = closeJsonStructures(test_str)
obj, err, _ = tryParseJson(closed_str)
if err is None and isinstance(obj, dict):
extracted_sections = extractSectionsFromDocument(obj)
if extracted_sections:
logger.debug(f"_extractSectionsRegex: Extracted {len(extracted_sections)} sections using progressive parsing at length {i} (DATA TRUNCATED)")
return extracted_sections
# Store best result even if no sections found
if not best_result:
best_result = obj
best_valid_length = i
# Strategy 2: Try to find balanced JSON and parse it
balanced_json_str = extractFirstBalancedJson(text)
if balanced_json_str and balanced_json_str != text.strip():
obj, err, _ = tryParseJson(balanced_json_str)
if err is None and isinstance(obj, dict):
extracted_sections = extractSectionsFromDocument(obj)
if extracted_sections:
logger.debug(f"_extractSectionsRegex: Extracted {len(extracted_sections)} sections from balanced JSON")
return extracted_sections
# Strategy 3: If we found a valid JSON object but no sections, try to extract sections from it
if best_result:
extracted_sections = extractSectionsFromDocument(best_result)
if extracted_sections:
logger.debug(f"_extractSectionsRegex: Extracted {len(extracted_sections)} sections from best result")
return extracted_sections
# Strategy 4: Last resort - try generic content extraction (only if nothing else worked)
logger.debug(f"_extractSectionsRegex: All structural parsing failed, trying generic content extraction")
sections = _extractGenericContent(text)
if sections:
logger.debug(f"_extractSectionsRegex: Generic content extraction found {len(sections)} sections")
return sections
def _removeLastIncompleteItem(items: List[str], original_text: str) -> List[str]:
"""
Remove the last item if it appears to be incomplete/corrupted.
This prevents corrupted data from being included in the final result.
"""
# re is already imported at module level
if not items:
return items
# Check if the original text ends with incomplete JSON patterns
# Look for patterns that suggest the last item was cut off
# Pattern 1: Text ends with incomplete string like {"text": "36
if re.search(r'\{"[^"]*"\s*:\s*"[^"]*$', original_text):
logger.debug("Detected incomplete string at end - removing last item")
return items[:-1]
# Pattern 2: Text ends with incomplete boolean like {"bool_flag": tr
if re.search(r'\{"[^"]*"\s*:\s*(true|false|tr|fa)$', original_text):
logger.debug("Detected incomplete boolean at end - removing last item")
return items[:-1]
# Pattern 3: Text ends with incomplete number like {"number": 123
if re.search(r'\{"[^"]*"\s*:\s*\d+$', original_text):
logger.debug("Detected incomplete number at end - removing last item")
return items[:-1]
# Pattern 4: Text ends with incomplete array like {"array": [1,2,3
if re.search(r'\{"[^"]*"\s*:\s*\[[^\]]*$', original_text):
logger.debug("Detected incomplete array at end - removing last item")
return items[:-1]
# Pattern 5: Text ends with incomplete object like {"obj": {"key": "val
if re.search(r'\{"[^"]*"\s*:\s*\{[^}]*$', original_text):
logger.debug("Detected incomplete object at end - removing last item")
return items[:-1]
# Pattern 6: Text ends with trailing comma (common sign of incomplete JSON)
if original_text.rstrip().endswith(','):
logger.debug("Detected trailing comma - removing last item")
return items[:-1]
# If no incomplete patterns detected, return all items
return items
def _extractGenericContent(text: str) -> List[Dict[str, Any]]:
"""
Extract generic content when no specific section patterns are found.
This handles cases where the JSON structure is completely broken.
Handles incomplete strings and corrupted data.
Excludes the last incomplete item to prevent corrupted data.
CRITICAL: Must preserve original content_type and id from the JSON structure!
"""
# re is already imported at module level
sections = []
# CRITICAL: First, try to extract the original section structure from the JSON
# Look for section patterns with content_type and id preserved
# Handle both complete and incomplete JSON (may be cut off mid-string)
# More flexible pattern that handles incomplete structures
section_pattern = r'"sections"\s*:\s*\[\s*\{[^}]*?"id"\s*:\s*"([^"]+)"[^}]*?"content_type"\s*:\s*"([^"]+)"[^}]*?"elements"\s*:\s*\[(.*?)(?:\]\s*\}|$)'
section_matches = re.finditer(section_pattern, text, re.DOTALL)
for match in section_matches:
section_id = match.group(1)
content_type = match.group(2)
elements_str = match.group(3)
# Extract elements based on content_type
elements = []
if content_type == "code_block":
# Look for {"code": "..."} patterns (complete)
code_pattern = r'\{"code"\s*:\s*"([^"]*)"(?:\s*,\s*"language"\s*:\s*"([^"]*)")?\}'
code_matches = re.finditer(code_pattern, elements_str, re.DOTALL)
for code_match in code_matches:
code = code_match.group(1)
language = code_match.group(2) if code_match.lastindex >= 2 else None
elem = {"code": code}
if language:
elem["language"] = language
elements.append(elem)
# Also look for incomplete code blocks (cut off mid-string)
# Pattern: {"code": "..." where string is not closed
incomplete_code_pattern = r'\{"code"\s*:\s*"([^"]*?)(?:"|$)'
incomplete_matches = re.finditer(incomplete_code_pattern, elements_str, re.DOTALL)
for inc_match in incomplete_matches:
code = inc_match.group(1)
# Check if this code is already in elements (from complete match)
if code and code not in [e.get("code", "")[:len(code)] for e in elements]:
# Extract language if present before the cut-off
language_match = re.search(r'"language"\s*:\s*"([^"]+)"', elements_str[:inc_match.end()])
language = language_match.group(1) if language_match else None
elem = {"code": code}
if language:
elem["language"] = language
elements.append(elem)
# If still no elements found, try to extract code from the raw elements string
# This handles cases where the JSON is very broken
if not elements:
# Look for any "code": "..." pattern, even if incomplete
raw_code_pattern = r'"code"\s*:\s*"([^"]*)"'
raw_code_matches = re.finditer(raw_code_pattern, elements_str, re.DOTALL)
for raw_match in raw_code_matches:
code = raw_match.group(1)
if code:
elements.append({"code": code})
# If still nothing, try to find incomplete code string
if not elements:
incomplete_raw_pattern = r'"code"\s*:\s*"([^"]*?)(?:"|$)'
incomplete_raw_matches = re.finditer(incomplete_raw_pattern, elements_str, re.DOTALL)
for inc_raw_match in incomplete_raw_matches:
code = inc_raw_match.group(1)
if code:
elements.append({"code": code})
elif content_type == "table":
# Look for table elements with rows (handle incomplete JSON)
# Pattern: {"headers": [...], "rows": [...]} or incomplete version
# More flexible pattern that handles incomplete rows array
# Match even if rows array is not closed
table_pattern = r'\{\s*"headers"\s*:\s*\[([^\]]*)\]\s*,\s*"rows"\s*:\s*\[(.*?)(?:\]\s*\}|$)'
table_matches = re.finditer(table_pattern, elements_str, re.DOTALL)
for table_match in table_matches:
headers_str = table_match.group(1)
rows_str = table_match.group(2)
# Parse headers
headers = [h.strip('"') for h in re.findall(r'"([^"]*)"', headers_str)]
# Parse rows (may be incomplete - handle cut-off)
rows = []
# Find all complete row arrays: ["...", "..."]
row_pattern = r'\[([^\]]*)\]'
row_matches = list(re.finditer(row_pattern, rows_str))
for row_match in row_matches:
row_str = row_match.group(1)
row = [cell.strip('"') for cell in re.findall(r'"([^"]*)"', row_str)]
if row:
rows.append(row)
# Also check for incomplete last row (cut off mid-row)
# Look for pattern like ["cell1", "cell2", "incomplete
# Find the last occurrence of [ that doesn't have a matching ]
if rows_str:
# Find all [ positions
open_brackets = [i for i, char in enumerate(rows_str) if char == '[']
close_brackets = [i for i, char in enumerate(rows_str) if char == ']']
# If there are more [ than ], we have an incomplete row
if len(open_brackets) > len(close_brackets):
# Find the last [ that doesn't have a matching ]
last_open = open_brackets[len(close_brackets)]
incomplete_row_str = rows_str[last_open+1:] # Skip the [
# Extract cells from incomplete row
incomplete_row = [cell.strip('"') for cell in re.findall(r'"([^"]*)"', incomplete_row_str)]
if incomplete_row and (not rows or incomplete_row != rows[-1]):
rows.append(incomplete_row)
elem = {"headers": headers, "rows": rows}
elements.append(elem)
elif content_type == "heading":
# Look for {"level": X, "text": "..."} patterns
heading_pattern = r'\{"level"\s*:\s*(\d+)\s*,\s*"text"\s*:\s*"([^"]*)"\}'
heading_matches = re.finditer(heading_pattern, elements_str)
for heading_match in heading_matches:
level = int(heading_match.group(1))
text = heading_match.group(2)
elements.append({"level": level, "text": text})
elif content_type in ["bullet_list", "numbered_list"]:
# Look for {"items": [...]} patterns (handle incomplete JSON)
# Pattern: {"items": [...]} or incomplete version
# More flexible pattern that handles incomplete items array
items_pattern = r'\{\s*"items"\s*:\s*\[(.*?)(?:\]\s*\}|$)'
items_matches = re.finditer(items_pattern, elements_str, re.DOTALL)
for items_match in items_matches:
items_str = items_match.group(1)
# Extract all complete items (quoted strings)
items = [item.strip('"') for item in re.findall(r'"([^"]*)"', items_str)]
# Also check for incomplete last item (cut off mid-string)
# Find the last occurrence of " that doesn't have a matching "
if items_str:
# Count quotes - odd number means incomplete item
quote_count = items_str.count('"')
if quote_count % 2 != 0:
# There's an incomplete item at the end
# Find the last complete item and the incomplete part
last_complete_quote = items_str.rfind('"', 0, items_str.rfind('"'))
if last_complete_quote >= 0:
incomplete_part = items_str[last_complete_quote+1:]
# Extract incomplete item (everything after last complete quote)
incomplete_item = incomplete_part.split(',')[0].strip('"')
if incomplete_item and incomplete_item not in items:
items.append(incomplete_item)
if items:
elements.append({"items": items})
elif content_type == "paragraph":
# Look for {"text": "..."} patterns
text_pattern = r'\{"text"\s*:\s*"([^"]*)"\}'
text_matches = re.finditer(text_pattern, elements_str)
for text_match in text_matches:
text = text_match.group(1)
elements.append({"text": text})
if elements:
sections.append({
"id": section_id,
"content_type": content_type,
"elements": elements,
"order": len(sections)
})
# If we found sections with preserved structure, return them
if sections:
return sections
# Fallback: Original logic for when structure is completely broken
# Look for any structured content patterns
# Pattern 1: Look for code_block {"code": "..."}
code_items = re.findall(r'\{"code"\s*:\s*"([^"]*)"\}', text)
incomplete_code_items = re.findall(r'\{"code"\s*:\s*"([^"]*?)(?:\n|$)', text)
all_code_items = code_items + incomplete_code_items
unique_code_items = list(dict.fromkeys([item for item in all_code_items if item.strip()]))
if unique_code_items:
unique_code_items = _removeLastIncompleteItem(unique_code_items, text)
if unique_code_items:
# Try to find section ID and language from original JSON
section_id_match = re.search(r'"id"\s*:\s*"([^"]+)"', text)
section_id = section_id_match.group(1) if section_id_match else "section_1"
language_match = re.search(r'"language"\s*:\s*"([^"]+)"', text)
language = language_match.group(1) if language_match else None
elements = [{"code": item} for item in unique_code_items]
if language and elements:
elements[0]["language"] = language
sections.append({
"id": section_id,
"content_type": "code_block",
"elements": elements,
"order": 1
})
return sections
# Pattern 2: Look for list items {"text": "..."}, including incomplete ones
list_items = re.findall(r'\{"text"\s*:\s*"([^"]*)"\}', text)
incomplete_list_items = re.findall(r'\{"text"\s*:\s*"([^"]*?)(?:\n|$)', text)
all_list_items = list_items + incomplete_list_items
unique_list_items = list(dict.fromkeys([item for item in all_list_items if item.strip()]))
if unique_list_items:
unique_list_items = _removeLastIncompleteItem(unique_list_items, text)
if unique_list_items:
elements = [{"text": item} for item in unique_list_items]
sections.append({
"id": "section_1",
"content_type": "list",
"elements": elements,
"order": 1
})
return sections
# Pattern 3: Look for paragraph text {"text": "..."}, including incomplete ones
if re.search(r'\{"text"\s*:\s*"[^"]*\}', text):
text_items = re.findall(r'\{"text"\s*:\s*"([^"]*)"\}', text)
incomplete_text_items = re.findall(r'\{"text"\s*:\s*"([^"]*?)(?:\n|$)', text)
all_text_items = text_items + incomplete_text_items
unique_text_items = list(dict.fromkeys([item for item in all_text_items if item.strip()]))
if unique_text_items:
unique_text_items = _removeLastIncompleteItem(unique_text_items, text)
if unique_text_items:
elements = [{"text": item} for item in unique_text_items]
sections.append({
"id": "section_1",
"content_type": "paragraph",
"elements": elements,
"order": 1
})
return sections
# Pattern 4: Look for any quoted strings that might be content, including incomplete ones
if re.search(r'"([^"]{3,})"', text):
text_items = re.findall(r'"([^"]{3,})"', text)
incomplete_text_items = re.findall(r'"([^"]{3,}?)(?:\n|$)', text)
all_text_items = text_items + incomplete_text_items
content_items = [item for item in all_text_items if not item.startswith(('section_', 'doc_', 'metadata', 'split_strategy', 'source_documents', 'extraction_method', 'id', 'content_type', 'elements', 'order', 'title', 'filename'))]
if content_items:
content_items = _removeLastIncompleteItem(content_items, text)
if content_items:
elements = [{"text": item} for item in content_items[:10]]
sections.append({
"id": "section_1",
"content_type": "paragraph",
"elements": elements,
"order": 1
})
return sections
def extractSectionsFromDocument(documentData: Dict[str, Any]) -> List[Dict[str, Any]]:
"""
Extract all sections from document data structure.
Handles both flat and nested document structures.
"""
if not isinstance(documentData, dict):
return []
# Try to extract sections from documents array
if "documents" in documentData:
all_sections = []
for doc in documentData.get("documents", []):
if isinstance(doc, dict) and "sections" in doc:
sections = doc.get("sections", [])
if isinstance(sections, list):
all_sections.extend(sections)
return all_sections
# Try to extract sections directly from root
if "sections" in documentData:
sections = documentData.get("sections", [])
if isinstance(sections, list):
return sections
return []
def _extractOverlapFromElement(elem: Dict[str, Any], elemType: str) -> Optional[Dict[str, Any]]:
"""
GENERIC function to extract overlap portion from an element.
Handles elements of any size, including very long strings:
- Paragraphs: Extract last N characters/words
- Code blocks: Extract last N lines
- Tables: Extract last N rows
- Lists: Extract last N items
- Other elements: Extract representative portion
Args:
elem: Element dictionary
elemType: Element type (table, paragraph, code_block, etc.)
Returns:
Overlap element dictionary with size-limited content, or None
"""
if not isinstance(elem, dict):
return None
# Get content (handle both flat and nested structures)
content = elem.get("content", {}) if isinstance(elem.get("content"), dict) else {}
if elemType == "table":
rows = elem.get("rows", []) or content.get("rows", [])
headers = elem.get("headers", []) or content.get("headers", [])
if rows:
# Extract last 3-5 rows as overlap (enough for context, not too large)
overlapRowCount = min(5, len(rows))
overlapRows = rows[-overlapRowCount:]
overlapElem = {
"type": "table",
"content": {
"headers": headers,
"rows": overlapRows
}
}
return overlapElem
elif elemType in ["bullet_list", "numbered_list"]:
items = elem.get("items", []) or content.get("items", [])
if items:
# Extract last 5-10 items as overlap
overlapItemCount = min(10, len(items))
overlapItems = items[-overlapItemCount:]
overlapElem = {
"type": elemType,
"content": {
"items": overlapItems
}
}
return overlapElem
elif elemType == "paragraph":
text = elem.get("text", "") or content.get("text", "")
if text:
# Extract last portion of text
# For very long text, use last 300-500 characters
# For shorter text, use all of it
maxOverlapChars = 500
minOverlapChars = 100
if len(text) > maxOverlapChars:
# Very long text - extract last portion
# Try to break at word boundary for readability
textSnippet = text[-maxOverlapChars:]
# Find first space/newline to start from word boundary
firstSpace = textSnippet.find(' ')
if firstSpace > 0 and firstSpace < 50:
textSnippet = textSnippet[firstSpace + 1:]
overlapText = textSnippet
elif len(text) > minOverlapChars:
# Medium text - use last portion
overlapText = text[-minOverlapChars:]
else:
# Short text - use all
overlapText = text
overlapElem = {
"type": "paragraph",
"content": {
"text": overlapText
}
}
return overlapElem
elif elemType == "code_block":
code = elem.get("code", "") or content.get("code", "")
if code:
# Extract last N lines of code
codeLines = code.split('\n')
# Use last 10-20 lines as overlap (enough context for continuation)
overlapLineCount = min(20, len(codeLines))
overlapLines = codeLines[-overlapLineCount:]
overlapCode = '\n'.join(overlapLines)
overlapElem = {
"type": "code_block",
"content": {
"code": overlapCode
}
}
return overlapElem
elif elemType == "heading":
# Headings are usually short - return as-is
return elem
elif elemType == "image":
# Images are usually small - return as-is
return elem
else:
# Generic element - try to extract a representative portion
# Convert to JSON and limit size
elemJson = json.dumps(elem, ensure_ascii=False)
# If element is very large, try to extract key fields only
if len(elemJson) > 1000:
# Extract only essential fields
overlapElem = {
"type": elemType,
"id": elem.get("id"),
"content": "..." # Indicate truncated content
}
return overlapElem
# Small element - return as-is
return elem
def buildContinuationContext(
allSections: List[Dict[str, Any]],
lastRawResponse: Optional[str] = None,
useCaseId: Optional[str] = None
) -> Dict[str, Any]:
"""
Build context information from accumulated sections for continuation prompt.
Returns summary of delivered data and cut-off point for continuation.
Args:
allSections: List of ALL sections accumulated across ALL iterations
lastRawResponse: Raw JSON response from last iteration (can be broken/incomplete)
useCaseId: Optional use case ID to determine expected JSON structure
Returns:
Dict with delivered_summary, cut_off_element, element_before_cutoff, template_structure,
last_complete_part, incomplete_part, structure_context
"""
context = {
"section_count": len(allSections),
}
# Build summary of delivered data (per-section counts)
summary_lines = []
summary_lines.append("Following data has already been delivered:\n")
summary_items = [] # Collect items for truncation check
for section in allSections:
section_id = section.get("id")
# CRITICAL: If section has no ID, omit it from summary
if not section_id:
continue
content_type = section.get("content_type", "")
elements = section.get("elements", [])
if content_type == "heading":
# Collect all heading elements with level and text
heading_elements = []
if isinstance(elements, list):
for elem in elements:
if isinstance(elem, dict):
level = elem.get("level", "")
text = elem.get("text", "")
if text:
heading_elements.append(f"level {level}: {text}")
elif isinstance(elements, dict):
level = elements.get("level", "")
text = elements.get("text", "")
if text:
heading_elements.append(f"level {level}: {text}")
if heading_elements:
summary_items.append(f'- heading "{section_id}" {", ".join(heading_elements)}')
elif content_type == "paragraph":
# Count text elements
text_count = 0
if isinstance(elements, list):
for elem in elements:
if isinstance(elem, dict) and elem.get("text"):
text_count += 1
elif isinstance(elements, dict) and elements.get("text"):
text_count = 1
if text_count > 0:
summary_items.append(f'- paragraph with {text_count} text(s)')
elif content_type in ["bullet_list", "numbered_list"]:
# Count items across all elements
item_count = 0
if isinstance(elements, list):
for elem in elements:
if isinstance(elem, dict):
items = elem.get("items", [])
if isinstance(items, list):
item_count += len(items)
elif isinstance(elements, dict):
items = elements.get("items", [])
if isinstance(items, list):
item_count = len(items)
if item_count > 0:
summary_items.append(f'- bullet_list with {item_count} items')
elif content_type == "table":
# Count rows across all elements
row_count = 0
if isinstance(elements, list):
for elem in elements:
if isinstance(elem, dict):
rows = elem.get("rows", [])
if isinstance(rows, list):
row_count += len(rows)
elif isinstance(elements, dict):
rows = elements.get("rows", [])
if isinstance(rows, list):
row_count = len(rows)
if row_count > 0:
summary_items.append(f'- table "{section_id}" with {row_count} rows')
elif content_type == "code_block":
# Count code lines across all elements
line_count = 0
if isinstance(elements, list):
for elem in elements:
if isinstance(elem, dict):
code = elem.get("code", "")
if code:
lines = [l for l in code.split('\n') if l.strip()]
line_count += len(lines)
elif isinstance(elements, dict):
code = elements.get("code", "")
if code:
lines = [l for l in code.split('\n') if l.strip()]
line_count = len(lines)
if line_count > 0:
line_word = "line" if line_count == 1 else "lines"
summary_items.append(f'- code_block "{section_id}" with {line_count} code {line_word}')
# If no sections extracted but we have raw response, indicate that previous response was broken
if len(summary_items) == 0 and lastRawResponse:
summary_items.append("- Previous response was incomplete/broken JSON - please continue from where it stopped")
# CRITICAL: If summary is too long, truncate: show first 10 and last 10 items
if len(summary_items) > 20:
first_10 = summary_items[:10]
last_10 = summary_items[-10:]
summary_lines.extend(first_10)
summary_lines.append(f"... (truncated {len(summary_items) - 20} items) ...")
summary_lines.extend(last_10)
else:
summary_lines.extend(summary_items)
context["delivered_summary"] = "\n".join(summary_lines)
# Extract cut-off point using new algorithm
# 1. Loop over all sections until finding incomplete section
# 2. In incomplete section, loop through elements until finding cut-off element
# CRITICAL: There is always only ONE section incomplete (JSON cut-off point)
cut_off_element = None
element_before_cutoff = None
if lastRawResponse:
try:
# CRITICAL: Always try to find incomplete section from raw JSON
# Even if JSON can be parsed, it might be incomplete (cut off mid-element)
raw_stripped = stripCodeFences(lastRawResponse.strip()).strip()
# Check if response is just a fragment (not full JSON structure)
# Fragments are continuation content that should be appended to the last incomplete element
is_fragment = not (raw_stripped.strip().startswith('{') or raw_stripped.strip().startswith('['))
if is_fragment:
# Response is a fragment - it continues the last incomplete element
# Find the last incomplete element from allSections
if allSections:
last_section = allSections[-1]
elements = last_section.get("elements", [])
if isinstance(elements, list) and elements:
# Get the last element (which should be incomplete)
last_elem = elements[-1]
if isinstance(last_elem, dict):
# The fragment continues this element
# Show the fragment as cut_off_element
cut_off_element = raw_stripped
# Show the element before (if there is one)
if len(elements) > 1:
element_before_cutoff = json.dumps(elements[-2])
else:
element_before_cutoff = json.dumps(last_elem)
else:
# Response is full JSON - use standard extraction
# Strategy 1: Try to find incomplete section using structured parsing
incomplete_section = _findIncompleteSectionInRaw(raw_stripped)
if incomplete_section:
cut_off_element, element_before_cutoff = _extractCutOffElements(incomplete_section, raw_stripped)
# Strategy 2: If no incomplete section found, extract directly from raw JSON
# This handles cases where JSON is cut off mid-element within a complete section
if not cut_off_element:
cut_off_element, element_before_cutoff = _extractCutOffElementsFromRaw(raw_stripped, allSections)
except Exception as e:
logger.debug(f"Error extracting cut-off point: {e}")
context["element_before_cutoff"] = element_before_cutoff
context["cut_off_element"] = cut_off_element
# Extract overlap information for continuation prompt
# GENERIC overlap extraction: handles elements of any size, including long strings
# Strategy: Extract last N elements, but if an element is very large, extract only a portion
overlapElements = []
overlapString = ""
if allSections:
# Get last section
lastSection = allSections[-1]
elements = lastSection.get("elements", [])
if isinstance(elements, list) and len(elements) > 0:
# Extract last 2-3 complete elements as overlap context
# This helps the AI understand what was already delivered
overlapCount = min(3, len(elements))
overlapElements = elements[-overlapCount:]
# Build overlap string showing these elements (with size limits for large elements)
overlapStrings = []
for elem in overlapElements:
if isinstance(elem, dict):
elemType = elem.get("type", "unknown")
overlapElem = _extractOverlapFromElement(elem, elemType)
if overlapElem:
overlapStrings.append(json.dumps(overlapElem, ensure_ascii=False))
else:
# Non-dict element - show as-is (but limit size)
elemStr = json.dumps(elem, ensure_ascii=False)
if len(elemStr) > 500:
elemStr = elemStr[:500] + "..."
overlapStrings.append(elemStr)
if overlapStrings:
overlapString = ",\n".join(overlapStrings)
context["overlap_elements"] = overlapElements
context["overlap_string"] = overlapString
# Store raw JSON response for prompt builder to check
if lastRawResponse:
context["last_raw_json"] = lastRawResponse
# Extract JSON structure context for continuation prompt
# This provides: template structure, last complete part, incomplete part, structure context
try:
structureContext = extractJsonStructureContext(lastRawResponse, useCaseId)
context["template_structure"] = structureContext.get("template_structure", "")
context["last_complete_part"] = structureContext.get("last_complete_part", "")
context["incomplete_part"] = structureContext.get("incomplete_part", "")
context["structure_context"] = structureContext.get("structure_context", "")
# Log if extraction succeeded but returned empty values
if not context["template_structure"] and not context["structure_context"]:
logger.debug(f"JSON structure context extraction returned empty values for useCaseId={useCaseId}")
except Exception as e:
logger.warning(f"Error extracting JSON structure context: {e}", exc_info=True)
context["template_structure"] = ""
context["last_complete_part"] = ""
context["incomplete_part"] = ""
context["structure_context"] = ""
else:
context["last_raw_json"] = ""
context["template_structure"] = ""
context["last_complete_part"] = ""
context["incomplete_part"] = ""
context["structure_context"] = ""
return context
def extractJsonStructureContext(
incompleteJson: str,
useCaseId: Optional[str] = None
) -> Dict[str, Any]:
"""
Extract JSON structure context from incomplete JSON for continuation prompts.
Extracts:
1. Template JSON structure of the complete object (structure only, no content)
2. Last complete part (last complete element/object)
3. Incomplete part (the cut-off portion)
4. Structure context (parent structure metadata only, no content)
Args:
incompleteJson: Incomplete JSON string (may be cut off mid-element)
useCaseId: Optional use case ID to determine expected structure
Returns:
Dict with:
- template_structure: Template JSON structure (structure only)
- last_complete_part: Last complete element/object as JSON string
- incomplete_part: Incomplete/cut-off portion as JSON string
- structure_context: Parent structure metadata (keys only, no content)
"""
from modules.shared.jsonUtils import stripCodeFences, normalizeJsonText
result = {
"template_structure": "",
"last_complete_part": "",
"incomplete_part": "",
"structure_context": ""
}
if not incompleteJson or not incompleteJson.strip():
return result
# Normalize JSON string
normalized = stripCodeFences(normalizeJsonText(incompleteJson)).strip()
if not normalized:
return result
# Find first '{' or '[' to start
startIdx = -1
for i, char in enumerate(normalized):
if char in '{[':
startIdx = i
break
if startIdx == -1:
return result
jsonContent = normalized[startIdx:]
# Step 1: Extract template structure (structure only, no content)
templateStructure = _extractTemplateStructure(jsonContent, useCaseId)
result["template_structure"] = templateStructure
# Step 2: Find last complete part and incomplete part
lastComplete, incompletePart = _extractLastCompleteAndIncomplete(jsonContent)
result["last_complete_part"] = lastComplete
result["incomplete_part"] = incompletePart
# Step 3: Extract structure context (parent structure metadata only)
# Pass both incomplete part and last complete part to show positions
structureContext = _extractStructureContext(jsonContent, incompletePart, lastComplete)
result["structure_context"] = structureContext
return result
def _extractTemplateStructure(jsonContent: str, useCaseId: Optional[str] = None) -> str:
"""
Extract template JSON structure (structure only, no content).
Examples:
- {"documents": [{"chapters": [{"sections": [...]}]}]}
- {"elements": [{"type": "...", "content": {...}}]}
"""
import json
import re
# Try to parse JSON to understand structure
try:
# Try to close and parse
closed = closeJsonStructures(jsonContent)
parsed = json.loads(closed)
# Build template structure (keys only, no content)
template = _buildStructureTemplate(parsed)
return json.dumps(template, indent=2, ensure_ascii=False)
except Exception:
# If parsing fails, try to extract structure from string
# Look for top-level keys
topLevelKeys = []
# Pattern: "key": { or "key": [
keyPattern = r'"([^"]+)"\s*:\s*[{\[]'
matches = re.findall(keyPattern, jsonContent)
if matches:
topLevelKeys = matches[:3] # Take first 3 keys
# Build template based on use case or detected keys
if useCaseId == "chapter_structure":
return json.dumps({"documents": [{"chapters": [{"id": "", "title": "", "level": 0}]}]}, indent=2, ensure_ascii=False)
elif useCaseId == "section_content":
return json.dumps({"elements": [{"type": "", "content": {}}]}, indent=2, ensure_ascii=False)
elif useCaseId == "code_structure":
return json.dumps({"files": [{"id": "", "filename": "", "fileType": ""}]}, indent=2, ensure_ascii=False)
elif topLevelKeys:
# Build generic template
template = {}
for key in topLevelKeys:
template[key] = []
return json.dumps(template, indent=2, ensure_ascii=False)
else:
return json.dumps({}, indent=2, ensure_ascii=False)
def _buildStructureTemplate(obj: Any, maxDepth: int = 3) -> Any:
"""
Build structure template from parsed JSON (keys only, no content).
"""
if isinstance(obj, dict):
template = {}
for key, value in obj.items():
if isinstance(value, (dict, list)):
template[key] = _buildStructureTemplate(value, maxDepth - 1) if maxDepth > 0 else None
else:
# Keep key but use empty value of same type
if isinstance(value, str):
template[key] = ""
elif isinstance(value, (int, float)):
template[key] = 0
elif isinstance(value, bool):
template[key] = False
else:
template[key] = None
return template
elif isinstance(obj, list) and obj:
# Use first element as template
return [_buildStructureTemplate(obj[0], maxDepth - 1) if maxDepth > 0 else None]
else:
return None
def _extractLastCompleteAndIncomplete(jsonContent: str) -> Tuple[str, str]:
"""
Extract last complete part and incomplete part from JSON.
Returns:
Tuple of (last_complete_part, incomplete_part) as JSON strings
"""
import json
# Try to find the last complete element/object
# Strategy: Parse backwards, find where structures are balanced
# Count braces and brackets to find where JSON becomes incomplete
braceCount = 0
bracketCount = 0
lastCompleteEnd = -1
inString = False
escapeNext = False
for i, char in enumerate(jsonContent):
if escapeNext:
escapeNext = False
continue
if char == '\\':
escapeNext = True
continue
if char == '"':
inString = not inString
continue
if not inString:
if char == '{':
braceCount += 1
elif char == '}':
braceCount -= 1
if braceCount == 0 and bracketCount == 0:
# Found end of complete structure
lastCompleteEnd = i + 1
elif char == '[':
bracketCount += 1
elif char == ']':
bracketCount -= 1
if braceCount == 0 and bracketCount == 0:
# Found end of complete structure
lastCompleteEnd = i + 1
# Extract parts
if lastCompleteEnd > 0:
lastCompletePart = jsonContent[:lastCompleteEnd]
incompletePart = jsonContent[lastCompleteEnd:].strip()
# Try to find last complete element within the structure
# Look for last complete object/array element
lastCompleteElement = _findLastCompleteElement(lastCompletePart)
if lastCompleteElement:
# Build context for incomplete part - show structure around the break
incompleteWithContext = _buildIncompleteContext(jsonContent, lastCompleteEnd)
return lastCompleteElement, incompleteWithContext
else:
# Build context for incomplete part
incompleteWithContext = _buildIncompleteContext(jsonContent, lastCompleteEnd)
return lastCompletePart, incompleteWithContext
else:
# No complete structure found - everything is incomplete
# Still try to show context
incompleteWithContext = _buildIncompleteContext(jsonContent, 0)
return "", incompleteWithContext
def _findLastCompleteElement(jsonStr: str) -> str:
"""
Find the last complete element in JSON string.
"""
import json
# Try to parse and extract last element
try:
closed = closeJsonStructures(jsonStr)
parsed = json.loads(closed)
# If it's a dict with arrays, get last element from first array
if isinstance(parsed, dict):
for key, value in parsed.items():
if isinstance(value, list) and value:
lastElem = value[-1]
return json.dumps(lastElem, indent=2, ensure_ascii=False)
# If it's a list, get last element
if isinstance(parsed, list) and parsed:
lastElem = parsed[-1]
return json.dumps(lastElem, indent=2, ensure_ascii=False)
except Exception:
pass
# Fallback: try to find last complete object using brace matching
braceCount = 0
startPos = -1
lastCompleteEnd = -1
for i, char in enumerate(jsonStr):
if char == '{':
if braceCount == 0:
startPos = i
braceCount += 1
elif char == '}':
braceCount -= 1
if braceCount == 0 and startPos >= 0:
lastCompleteEnd = i + 1
if lastCompleteEnd > 0:
return jsonStr[startPos:lastCompleteEnd]
return ""
def _buildIncompleteContext(jsonContent: str, breakPosition: int) -> str:
"""
Build intelligent context showing the incomplete element with its parent structure hierarchy.
Logic (as per user instruction):
1. Cut piece level: element of a list (the incomplete element at cut point)
2. Parent of the cut element: the list/array containing the cut piece (with cut point shown)
3. Last complete object on the same level like the cut object (if exists) PLUS further previous
content from the json string (maximum 1000 characters)
4. Next parent levels, until root. Further 1000 characters to show content (but only complete
objects - if too big, not to show), then only showing metadata until root
Example output structure:
{
"elements": [
{
"content": {
"rows": [
[37847, 37853, 37861, 37871, 37879, 37889, 37897, 37907, 37951, 37957],
[37957, 37963, 37967, 37987, 37991, <-- CUT POINT (incomplete)
"""
import json
import re
if breakPosition <= 0 or breakPosition >= len(jsonContent):
# Invalid break position - show last 500 chars
return jsonContent[-500:] if len(jsonContent) > 500 else jsonContent
contextParts = []
# Find structure hierarchy backwards from break point
hierarchy = _findStructureHierarchy(jsonContent, breakPosition)
if not hierarchy:
# Fallback: show simple context
contextParts.append("Cut point context:\n")
contextStart = max(0, breakPosition - 500)
contextParts.append(jsonContent[contextStart:breakPosition + 100])
return "\n".join(contextParts)
# Step 1: Extract cut piece (incomplete element at cut point)
cutPiece = _extractCutPiece(jsonContent, breakPosition)
# Step 2: Find the cut level (the array/object containing the cut piece)
cutLevel = hierarchy[-1] if hierarchy else None
if not cutLevel:
# Fallback
contextParts.append("Cut point context:\n")
contextStart = max(0, breakPosition - 500)
contextParts.append(jsonContent[contextStart:breakPosition + 100])
return "\n".join(contextParts)
# Build context following the exact structure requested
# Show hierarchical structure from root to cut point
# Extract the actual JSON structure from root to cut point
# Build the full hierarchical structure showing:
# 4. Parent levels until root (with content/metadata limits)
# 3. Last complete elements on same level + previous content (max 1000 chars)
# 2. Parent container (the list) with cut piece
# 1. Cut piece
resultLines = []
# Build structure from root to cut level
# Extract actual JSON content for each level
for i, level in enumerate(hierarchy):
levelType = level['type']
start = level['start_pos']
end = level['end_pos'] if i < len(hierarchy) - 1 else breakPosition
key = level.get('key')
depth = level['depth']
indent = " " * depth
if i < len(hierarchy) - 1:
# Parent levels - show opening structure
levelContent = jsonContent[start:end]
# If content is too large, show only metadata
if len(levelContent) > 1000:
# Show opening with key
opening = jsonContent[start:min(start + 100, end)]
if key:
resultLines.append(f'{indent}"{key}": {{')
else:
resultLines.append(f'{indent}{{')
resultLines.append(f'{indent} ...')
else:
# Show opening structure
if key:
# Find where the key's value starts
keyEnd = jsonContent.find(':', start)
if keyEnd > 0:
opening = jsonContent[start:min(keyEnd + 50, end)]
resultLines.append(f'{indent}{opening}')
else:
opening = jsonContent[start:min(start + 50, end)]
resultLines.append(f'{indent}{opening}')
else:
# Cut level - show detailed context
cutLevelType = levelType
cutLevelStart = start
cutLevelKey = key
cutLevelDepth = depth
# Show key if available
if cutLevelKey:
resultLines.append(f'{indent}"{cutLevelKey}": {{')
indent += " "
if cutLevelType == 'array':
# Show array opening
arrayKey = _findKeyBefore(jsonContent, cutLevelStart)
if arrayKey:
resultLines.append(f'{indent}"{arrayKey}": [')
else:
resultLines.append(f'{indent}[')
indent += " "
# 3. Show last complete elements on same level + previous content (max 1000 chars)
contentBeforeBreak = jsonContent[cutLevelStart:breakPosition]
lastCompleteElements = _extractLastCompleteArrayElementsWithContext(
contentBeforeBreak, jsonContent, cutLevelStart, maxChars=1000
)
if lastCompleteElements:
resultLines.append(lastCompleteElements)
# 2. Show parent container (the list) with cut piece
cutArrayElement = _findCutArrayElement(jsonContent, breakPosition, cutLevelStart)
if cutArrayElement:
resultLines.append(f'{indent}{cutArrayElement} <-- CUT POINT (incomplete)')
else:
# Fallback: show what we have at break point
cutPart = jsonContent[breakPosition:breakPosition + 200].strip()
resultLines.append(f'{indent}{cutPart} <-- CUT POINT (incomplete)')
# Close the array
indent = indent[:-2] if len(indent) >= 2 else indent
resultLines.append(f'{indent}]')
else:
# Object at cut level
cutPart = jsonContent[breakPosition:breakPosition + 200].strip()
preview = jsonContent[cutLevelStart:breakPosition]
preview = preview[-500:] if len(preview) > 500 else preview
resultLines.append(f'{indent}{preview}... {cutPart} <-- CUT POINT (incomplete)')
# Close all parent structures
for i in range(len(hierarchy) - 2, -1, -1):
level = hierarchy[i]
depth = level['depth']
indent = " " * depth
resultLines.append(f'{indent}}}')
contextParts.append("\n".join(resultLines))
return "\n".join(contextParts)
def _extractCutPiece(jsonContent: str, breakPosition: int) -> str:
"""Extract the incomplete piece at the cut point."""
# Get characters after break point (incomplete part)
afterBreak = jsonContent[breakPosition:breakPosition + 200].strip()
# Find where the incomplete piece ends (next comma, bracket, brace, or end)
for i, char in enumerate(afterBreak):
if char in [',', ']', '}', '\n']:
return afterBreak[:i].strip()
return afterBreak[:50].strip() # Limit to 50 chars if no delimiter found
def _findStructureHierarchy(jsonContent: str, breakPosition: int) -> List[Dict[str, Any]]:
"""
Find the structure hierarchy backwards from break point to root.
Returns list of level info dicts, from root to cut level.
Each level has: type, start_pos, end_pos, parent_start, content_preview
"""
hierarchy = []
# Track depth and positions
braceDepth = 0
bracketDepth = 0
inString = False
escapeNext = False
# Find all structure boundaries before break point
structureStack = [] # Stack of (type, start_pos, depth)
for i in range(breakPosition):
if i >= len(jsonContent):
break
char = jsonContent[i]
if escapeNext:
escapeNext = False
continue
if char == '\\':
escapeNext = True
continue
if char == '"':
inString = not inString
continue
if not inString:
if char == '{':
structureStack.append(('object', i, braceDepth + bracketDepth))
braceDepth += 1
elif char == '}':
if structureStack and structureStack[-1][0] == 'object':
_, start, depth = structureStack.pop()
hierarchy.append({
'type': 'object',
'start_pos': start,
'end_pos': i + 1,
'depth': depth,
'key': _findKeyBefore(jsonContent, start)
})
braceDepth -= 1
elif char == '[':
structureStack.append(('array', i, braceDepth + bracketDepth))
bracketDepth += 1
elif char == ']':
if structureStack and structureStack[-1][0] == 'array':
_, start, depth = structureStack.pop()
hierarchy.append({
'type': 'array',
'start_pos': start,
'end_pos': i + 1,
'depth': depth,
'key': _findKeyBefore(jsonContent, start)
})
bracketDepth -= 1
# Sort by depth (root first) and filter to get hierarchy from root to cut
hierarchy.sort(key=lambda x: x['depth'])
# Find which level contains the break point
cutLevelIndex = -1
for i, level in enumerate(hierarchy):
if level['start_pos'] < breakPosition <= level['end_pos']:
cutLevelIndex = i
break
if cutLevelIndex >= 0:
# Return hierarchy from root to cut level
return hierarchy[:cutLevelIndex + 1]
return []
def _findKeyBefore(jsonContent: str, pos: int) -> Optional[str]:
"""Find the key name before a structure start position."""
# Look backwards for "key": pattern
before = jsonContent[max(0, pos - 100):pos]
match = re.search(r'"([^"]+)"\s*:\s*[{\[]\s*$', before)
if match:
return match.group(1)
return None
def _formatLevelContext(level: Dict[str, Any], jsonContent: str, maxContentChars: int = 1000) -> str:
"""Format a level in the hierarchy for display."""
levelType = level['type']
start = level['start_pos']
end = level['end_pos']
key = level.get('key')
# Get content for this level
levelContent = jsonContent[start:end]
# If content is too large, show only metadata
if len(levelContent) > maxContentChars:
# Show opening and key if available
if key:
return f' "{key}": {levelType} (content too large, {len(levelContent)} chars)'
else:
return f' {levelType} (content too large, {len(levelContent)} chars)'
else:
# Show full content (formatted)
indent = " " * level['depth']
if key:
return f'{indent}"{key}": {levelContent[:maxContentChars]}'
else:
return f'{indent}{levelContent[:maxContentChars]}'
def _formatCutLevelContextDetailed(level: Dict[str, Any], cutPiece: str, jsonContent: str, breakPosition: int) -> str:
"""
Format the cut level showing detailed hierarchy as per user instruction:
1. Cut piece level: element of a list (the incomplete element)
2. Parent of the cut element: the list containing the cut piece (with cut point shown)
3. Last complete object on the same level like the cut object (if exists) PLUS further
previous content from the json string (maximum 1000 characters)
"""
levelType = level['type']
start = level['start_pos']
key = level.get('key')
# Get content before break point in this level
contentBeforeBreak = jsonContent[start:breakPosition]
result = []
if levelType == 'array':
# Step 3: Show last complete elements on same level + previous content (max 1000 chars)
# Extract last complete array elements with context (up to 1000 chars)
lastCompleteElements = _extractLastCompleteArrayElementsWithContext(
contentBeforeBreak, jsonContent, start, maxChars=1000
)
if lastCompleteElements:
result.append("3. Last complete elements on same level (plus previous content, max 1000 chars):")
result.append(lastCompleteElements)
result.append("")
# Step 2: Show parent container (the list) with cut piece
# Find the array element that contains the cut piece
cutArrayElement = _findCutArrayElement(jsonContent, breakPosition, start)
if cutArrayElement:
result.append("2. Parent container (list containing cut piece):")
result.append(f" {cutArrayElement}")
else:
# Fallback: show cut piece directly
cutPart = jsonContent[breakPosition:breakPosition + 200].strip()
result.append("2. Parent container (list containing cut piece):")
result.append(f" {cutPart}")
result.append("")
# Step 1: Show cut piece (incomplete element at cut point)
result.append("1. Cut piece level (incomplete element at cut point):")
if cutPiece:
result.append(f" {cutPiece}")
else:
cutPart = jsonContent[breakPosition:breakPosition + 50].strip()
result.append(f" {cutPart}")
else:
# Object - show structure with cut point
result.append("Cut point in object:")
cutPart = jsonContent[breakPosition:breakPosition + 200].strip()
preview = contentBeforeBreak[-500:] if len(contentBeforeBreak) > 500 else contentBeforeBreak
result.append(f" {preview}... {cutPart} <-- CUT POINT")
return "\n".join(result)
def _formatParentLevelContext(level: Dict[str, Any], jsonContent: str, maxContentChars: int = 1000) -> str:
"""
Format a parent level showing content (if small enough) or metadata only.
Used for levels above the cut level, showing path to root.
"""
levelType = level['type']
start = level['start_pos']
end = level['end_pos']
key = level.get('key')
# Get content for this level
levelContent = jsonContent[start:end]
# If content is too large, show only metadata
if len(levelContent) > maxContentChars:
# Show opening structure with key if available
opening = jsonContent[start:start + 200].strip()
if key:
return f' "{key}": {levelType} (content too large, {len(levelContent)} chars)\n {opening}...'
else:
return f' {levelType} (content too large, {len(levelContent)} chars)\n {opening}...'
else:
# Show full content (formatted, but limit to maxContentChars)
content = levelContent[:maxContentChars]
if key:
return f' "{key}": {content}'
else:
return f' {content}'
def _extractLastCompleteArrayElementsWithContext(
arrayContent: str, fullJsonContent: str, arrayStart: int, maxChars: int = 1000
) -> str:
"""
Extract last complete array elements PLUS further previous content from json string (max 1000 chars).
This shows:
- Last complete elements on the same level as the cut element
- Additional previous content from the JSON string (up to maxChars total)
"""
# First, extract last complete elements from arrayContent
completeElements = []
currentElement = ""
braceDepth = 0
bracketDepth = 0
inString = False
escapeNext = False
totalChars = 0
# Parse backwards to find complete elements
for i in range(len(arrayContent) - 1, -1, -1):
char = arrayContent[i]
if escapeNext:
escapeNext = False
currentElement = char + currentElement
continue
if char == '\\':
escapeNext = True
currentElement = char + currentElement
continue
if char == '"':
inString = not inString
currentElement = char + currentElement
continue
if not inString:
if char == '}':
braceDepth += 1
currentElement = char + currentElement
elif char == '{':
braceDepth -= 1
currentElement = char + currentElement
if braceDepth == 0 and bracketDepth == 0:
# Found complete element
element = currentElement.strip()
if element and element[0] in ['{', '[']:
completeElements.insert(0, element)
totalChars += len(element)
if totalChars >= maxChars:
break
currentElement = ""
elif char == ']':
bracketDepth += 1
currentElement = char + currentElement
elif char == '[':
bracketDepth -= 1
currentElement = char + currentElement
if braceDepth == 0 and bracketDepth == 0:
# Found complete element
element = currentElement.strip()
if element and element[0] == '[':
completeElements.insert(0, element)
totalChars += len(element)
if totalChars >= maxChars:
break
currentElement = ""
elif char == ',' and braceDepth == 0 and bracketDepth == 0:
# Element boundary
if currentElement.strip():
element = currentElement.strip()
if element and element[0] in ['{', '[', '"']:
completeElements.insert(0, element)
totalChars += len(element)
if totalChars >= maxChars:
break
currentElement = ""
else:
currentElement = char + currentElement
# Format the elements
if completeElements:
# Show last few complete elements (up to maxChars)
formattedElements = []
charsUsed = 0
for elem in reversed(completeElements): # Show from newest to oldest
if charsUsed + len(elem) <= maxChars:
formattedElements.insert(0, elem)
charsUsed += len(elem)
else:
break
if formattedElements:
# Format as JSON array rows
result = []
for elem in formattedElements:
result.append(f" {elem},")
return "\n".join(result)
return ""
def _findCutArrayElement(jsonContent: str, breakPosition: int, arrayStart: int) -> Optional[str]:
"""Find the array element that contains the cut piece."""
# Look backwards from break position to find the start of the current array element
braceDepth = 0
bracketDepth = 0
inString = False
escapeNext = False
elementStart = -1
# Search backwards from break position
for i in range(breakPosition - 1, arrayStart - 1, -1):
if i < 0:
break
char = jsonContent[i]
if escapeNext:
escapeNext = False
continue
if char == '\\':
escapeNext = True
continue
if char == '"':
inString = not inString
continue
if not inString:
if char == '}':
braceDepth += 1
elif char == '{':
braceDepth -= 1
if braceDepth == 0 and bracketDepth == 0:
elementStart = i
break
elif char == ']':
bracketDepth += 1
elif char == '[':
bracketDepth -= 1
if braceDepth == 0 and bracketDepth == 0:
elementStart = i
break
elif char == ',' and braceDepth == 0 and bracketDepth == 0:
# Found element boundary
elementStart = i + 1
break
if elementStart >= 0:
# Extract the element (including incomplete part)
elementContent = jsonContent[elementStart:breakPosition + 100].strip()
# Clean up - remove leading comma if present
if elementContent.startswith(','):
elementContent = elementContent[1:].strip()
return elementContent[:300] # Limit length
return None
def _extractLastCompleteArrayElements(arrayContent: str, maxChars: int = 1000) -> str:
"""Extract last complete array elements, up to maxChars."""
# Count complete elements from the end
elements = []
currentElement = ""
braceDepth = 0
bracketDepth = 0
inString = False
escapeNext = False
totalChars = 0
# Parse backwards to find complete elements
for i in range(len(arrayContent) - 1, -1, -1):
char = arrayContent[i]
if escapeNext:
escapeNext = False
currentElement = char + currentElement
continue
if char == '\\':
escapeNext = True
currentElement = char + currentElement
continue
if char == '"':
inString = not inString
currentElement = char + currentElement
continue
if not inString:
if char == '}':
braceDepth += 1
currentElement = char + currentElement
elif char == '{':
braceDepth -= 1
currentElement = char + currentElement
if braceDepth == 0 and bracketDepth == 0:
# Found complete element
element = currentElement.strip()
if element and element[0] in ['{', '[']:
elements.insert(0, element)
totalChars += len(element)
if totalChars >= maxChars:
break
currentElement = ""
elif char == ']':
bracketDepth += 1
currentElement = char + currentElement
elif char == '[':
bracketDepth -= 1
currentElement = char + currentElement
if braceDepth == 0 and bracketDepth == 0:
# Found complete element
element = currentElement.strip()
if element and element[0] == '[':
elements.insert(0, element)
totalChars += len(element)
if totalChars >= maxChars:
break
currentElement = ""
elif char == ',' and braceDepth == 0 and bracketDepth == 0:
# Element boundary
if currentElement.strip():
element = currentElement.strip()
if element and element[0] in ['{', '[', '"']:
elements.insert(0, element)
totalChars += len(element)
if totalChars >= maxChars:
break
currentElement = ""
else:
currentElement = char + currentElement
if elements:
indent = " "
formatted = ",\n".join([f"{indent}{elem}" for elem in elements[-5:]]) # Show last 5 elements
if len(elements) > 5:
formatted = f"... ({len(elements) - 5} more elements) ...\n{formatted}"
return formatted
return ""
def _extractStructureContext(jsonContent: str, incompletePart: str, lastCompletePart: str = "") -> str:
"""
Extract structure context showing WHERE in the structure the last complete and incomplete elements are.
Returns a clear description of the structure context for the broken element.
"""
import json
import re
if not incompletePart:
# No incomplete part extracted - try to show context from raw JSON
try:
# Show last part of JSON to indicate where it broke
lastPart = jsonContent[-300:] if len(jsonContent) > 300 else jsonContent
return f"Structure context unavailable. Last part of response:\n{lastPart}"
except Exception:
return "Structure context unavailable - response was completely broken"
# Find where incomplete part starts
incompleteStart = jsonContent.find(incompletePart)
if incompleteStart == -1:
incompleteStart = len(jsonContent)
# Try to extract the structure context showing the broken element
try:
# Get the part before incomplete to understand structure
beforeIncomplete = jsonContent[:incompleteStart]
# Try to find the array/object context where the break occurred
# Look for the last complete structure before the break
structureContext = ""
# Try to parse what we have before the incomplete part
try:
closed = closeJsonStructures(beforeIncomplete)
parsed = json.loads(closed)
# Build structure showing where we are
if isinstance(parsed, dict) and "elements" in parsed:
elements = parsed.get("elements", [])
if isinstance(elements, list):
structureContext = f"Structure: elements array with {len(elements)} complete elements\n"
structureContext += f"Break occurred in element at index {len(elements)}"
else:
structureContext = "Structure: elements (not an array)"
else:
structureContext = "Structure: " + json.dumps(_buildStructureContext(parsed), indent=2, ensure_ascii=False)
except Exception:
# Can't parse - show raw context
structureContext = f"Structure parsing failed. Context before break:\n{beforeIncomplete[-200:]}"
return structureContext
except Exception:
# Fallback: show minimal context
return f"Structure context unavailable. Break occurred at position {incompleteStart} in JSON string"
def _findElementPath(parsed: Any, elementStr: str, originalJson: str, isIncomplete: bool = False) -> str:
"""
Find the path to an element in the parsed JSON structure.
Returns a path like "elements[2]" or "documents[0].chapters[1].sections[3]"
"""
import json
if not elementStr or not elementStr.strip():
return ""
# Strategy: Find position in original JSON string, then determine path from structure
elementStart = originalJson.find(elementStr.strip())
if elementStart == -1:
return ""
# Find the array context by looking backwards from element position
beforeElement = originalJson[:elementStart]
# Find the nearest array declaration before this position
# Look for patterns like "elements": [ or "chapters": [
arrayPattern = r'"(\w+)"\s*:\s*\['
matches = list(re.finditer(arrayPattern, beforeElement))
if not matches:
return ""
# Get the most recent array (closest to element)
lastMatch = matches[-1]
arrayName = lastMatch.group(1)
arrayStartPos = lastMatch.end()
# Count complete array elements before this position
arrayContent = beforeElement[arrayStartPos:]
# Count complete objects (balanced braces) - each complete object is an array element
braceCount = 0
elementIndex = 0
inString = False
escapeNext = False
lastCompleteObjectEnd = -1
for i, char in enumerate(arrayContent):
if escapeNext:
escapeNext = False
continue
if char == '\\':
escapeNext = True
continue
if char == '"':
inString = not inString
continue
if not inString:
if char == '{':
if braceCount == 0:
# Start of new object
elementIndex += 1
braceCount += 1
elif char == '}':
braceCount -= 1
if braceCount == 0:
# End of complete object
lastCompleteObjectEnd = i
# Determine the index
# If we're looking for incomplete element, it's at the current elementIndex
# If we're looking for last complete element, it's at elementIndex - 1
if isIncomplete:
index = elementIndex
else:
index = elementIndex - 1 if elementIndex > 0 else 0
# Build the full path by traversing the parsed structure
def _buildPathToArray(obj: Any, targetArrayName: str, targetIndex: int, currentPath: str = "") -> Optional[str]:
"""Recursively find path to array element."""
if isinstance(obj, dict):
for key, value in obj.items():
newPath = f"{currentPath}.{key}" if currentPath else key
if key == targetArrayName and isinstance(value, list):
# Found the target array
if 0 <= targetIndex < len(value):
return f"{newPath}[{targetIndex}]"
elif targetIndex >= len(value):
# Index beyond array - return array path with index
return f"{newPath}[{targetIndex}]"
result = _buildPathToArray(value, targetArrayName, targetIndex, newPath)
if result:
return result
elif isinstance(obj, list):
for i, item in enumerate(obj):
result = _buildPathToArray(item, targetArrayName, targetIndex, currentPath)
if result:
return result
return None
# Try to find full path in parsed structure
fullPath = _buildPathToArray(parsed, arrayName, index)
if fullPath:
return fullPath
# Fallback: return simple array path
return f"{arrayName}[{index}]"
def _buildStructureContext(obj: Any, maxDepth: int = 5) -> Any:
"""
Build structure context (metadata only, no content).
Similar to _buildStructureTemplate but focuses on parent structure.
"""
if isinstance(obj, dict):
structure = {}
for key, value in obj.items():
if isinstance(value, (dict, list)):
structure[key] = _buildStructureContext(value, maxDepth - 1) if maxDepth > 0 else []
else:
# Skip content values - only keep structure
pass
return structure
elif isinstance(obj, list) and obj:
# Return empty list structure (no content)
return []
else:
return None
def _findIncompleteSectionInRaw(raw_json: str) -> Optional[Dict[str, Any]]:
"""
Find the incomplete section in raw JSON.
CRITICAL: JSON can be cut off mid-element (e.g., {"text": "20327,20)
We need to find the last section and check if it's incomplete.
"""
try:
# Try to parse documents structure
if '"documents"' in raw_json:
# Find last document
doc_start = raw_json.rfind('"documents"')
if doc_start >= 0:
doc_section = raw_json[doc_start:]
# Try to find sections array
sections_start = doc_section.find('"sections"')
if sections_start >= 0:
sections_section = doc_section[sections_start:]
# Find sections array start
array_start = sections_section.find('[')
if array_start >= 0:
# Find all complete sections
section_objects = []
depth = 0
section_start = None
for i in range(array_start, len(sections_section)):
if sections_section[i] == '{':
if depth == 0:
section_start = i
depth += 1
elif sections_section[i] == '}':
depth -= 1
if depth == 0 and section_start is not None:
# Found complete section
section_str = sections_section[section_start:i+1]
try:
section_obj = json.loads('{' + section_str + '}')
section_objects.append(section_obj)
except:
pass
section_start = None
# CRITICAL: Check if there's content after the last complete section
# If JSON ends mid-element, the last section is incomplete
if section_objects:
# Find position after last complete section
last_section_end = sections_section.rfind('}')
if last_section_end >= 0:
# Check if there's more content after the last }
remaining_after_last_section = sections_section[last_section_end+1:].strip()
# Remove closing brackets/braces that might be there
remaining_after_last_section = remaining_after_last_section.lstrip('],}')
# If there's still content (like incomplete element), section is incomplete
if remaining_after_last_section and not remaining_after_last_section.startswith(']'):
# Last section is incomplete - return it
return section_objects[-1]
# Also check: if we can't parse the full sections array, last section is incomplete
try:
# Try to parse the sections array
sections_array_str = sections_section[array_start:]
json.loads(sections_array_str)
# Parsed successfully - all sections complete
return None
except:
# Cannot parse - last section is incomplete
return section_objects[-1] if section_objects else None
except Exception as e:
logger.debug(f"Error finding incomplete section: {e}")
return None
def _extractCutOffElements(incomplete_section: Dict[str, Any], raw_json: str) -> Tuple[Optional[str], Optional[str]]:
"""Extract cut-off element and element before from incomplete section."""
cut_off_element = None
element_before_cutoff = None
elements = incomplete_section.get("elements", [])
if not elements:
return None, None
# CRITICAL: In 99% of cases, JSON is cut off mid-string or mid-number
# Deliver the cut-off part AS-IS (don't try to "complete" it)
if isinstance(elements, list):
# Find last element (might be incomplete)
if elements:
# Edge case: If cut-off is in first element, just show cut-off element
if len(elements) == 1:
# Only one element - might be cut-off
last_elem = elements[0]
if isinstance(last_elem, dict):
# Check if element contains nested content (e.g., code_block with JSON string)
cut_off_element = _extractCutOffFromElement(last_elem, raw_json)
if not cut_off_element:
cut_off_element = json.dumps(last_elem)
else:
cut_off_element = str(last_elem)
else:
# Multiple elements - last one might be cut-off, get element before
element_before_cutoff = json.dumps(elements[-2]) if isinstance(elements[-2], dict) else str(elements[-2])
last_elem = elements[-1]
if isinstance(last_elem, dict):
# Check if element contains nested content
cut_off_element = _extractCutOffFromElement(last_elem, raw_json)
if not cut_off_element:
cut_off_element = json.dumps(last_elem)
else:
cut_off_element = str(last_elem)
elif isinstance(elements, dict):
# Single element - might be cut-off
cut_off_element = _extractCutOffFromElement(elements, raw_json)
if not cut_off_element:
cut_off_element = json.dumps(elements)
# If we couldn't extract from parsed structure, extract from raw JSON
if not cut_off_element:
# Extract the last incomplete part from raw JSON
# Find the last incomplete string/number/array
# re is already imported at module level
# Look for incomplete string at the end
incomplete_match = re.search(r'"([^"]*?)(?:"|$)', raw_json[-500:], re.DOTALL)
if incomplete_match:
cut_off_element = incomplete_match.group(1)
else:
# Look for incomplete number
number_match = re.search(r'(\d+\.?\d*)(?:\s*[,}\]]|$)', raw_json[-200:])
if number_match:
cut_off_element = number_match.group(1)
return cut_off_element, element_before_cutoff
def _extractCutOffFromElement(element: Dict[str, Any], raw_json: str) -> Optional[str]:
"""
Extract cut-off point from within an element (e.g., code_block with JSON string, table with incomplete rows).
This helps identify where exactly to continue within nested structures.
"""
# re is already imported at module level
# Check for code_block with nested JSON
if "code" in element:
code_content = element.get("code", "")
if isinstance(code_content, str) and code_content.strip().startswith("{"):
# This is JSON inside a code string - find where it was cut off
# Look for the last complete value in the raw JSON
# Find the code string in raw JSON
code_match = re.search(r'"code"\s*:\s*"([^"]*?)(?:"|$)', raw_json[-2000:], re.DOTALL)
if code_match:
code_str = code_match.group(1)
# Try to find the last complete value in the JSON string
# Look for patterns like: [2, 3, 5, ... 17929, (cut off here)
array_match = re.search(r'\[([^\]]*?)(?:\]|$)', code_str, re.DOTALL)
if array_match:
array_content = array_match.group(1)
# Find last complete number/item
# Match: number followed by comma or end
last_complete = re.findall(r'(\d+)\s*[,]', array_content)
if last_complete:
last_num = last_complete[-1]
# Return context showing where to continue
return f'{{"code": "{{\\"primes\\": [... up to {last_num}, <CONTINUE FROM HERE>]"}}'
# Check for table with incomplete rows
if "rows" in element:
rows = element.get("rows", [])
if isinstance(rows, list) and rows:
# Find last complete row in raw JSON
rows_str = str(rows)
# Try to find where rows were cut off
last_row_match = re.search(r'\[([^\]]*?)(?:\]|$)', raw_json[-1000:], re.DOTALL)
if last_row_match:
return f'{{"rows": [... last complete row shown above, <CONTINUE FROM HERE>]}}'
# Check for list items
if "items" in element:
items = element.get("items", [])
if isinstance(items, list) and items:
# Find last complete item
last_item_match = re.search(r'"([^"]*?)"\s*(?:,|\])', raw_json[-1000:], re.DOTALL)
if last_item_match:
return f'{{"items": [... last item shown above, <CONTINUE FROM HERE>]}}'
return None
def _extractCutOffElementsFromRaw(raw_json: str, allSections: List[Dict[str, Any]]) -> Tuple[Optional[str], Optional[str]]:
"""
Extract cut-off element directly from raw JSON when section parsing fails.
This handles ALL cases where JSON is cut off:
- Mid-element (incomplete element object)
- Mid-string/number within an element
- Mid-array within an element (e.g., rows in table, items in list)
- Mid-nested structure
CRITICAL: In 99% of cases, JSON is cut off mid-string or mid-number - deliver as-is.
"""
cut_off_element = None
element_before_cutoff = None
try:
# Find the last "elements" array in raw JSON
if '"elements"' in raw_json:
# Find the last occurrence of "elements"
last_elements_pos = raw_json.rfind('"elements"')
if last_elements_pos >= 0:
elements_section = raw_json[last_elements_pos:]
# Find the array start '['
array_start = elements_section.find('[')
if array_start >= 0:
# Use a simpler approach: find all element objects by tracking braces
# This works even if elements contain nested arrays/objects
element_strings = []
depth = 0
in_string = False
escape_next = False
elem_start = None
for i in range(array_start, len(elements_section)):
char = elements_section[i]
# Track string state (ignore brackets/braces inside strings)
if escape_next:
escape_next = False
continue
if char == '\\':
escape_next = True
continue
if char == '"' and not escape_next:
in_string = not in_string
continue
if not in_string:
if char == '{':
if depth == 0:
elem_start = i
depth += 1
elif char == '}':
depth -= 1
if depth == 0 and elem_start is not None:
# Found complete element (all braces closed, even if nested arrays are incomplete)
elem_str = elements_section[elem_start:i+1]
element_strings.append(elem_str)
elem_start = None
# Now analyze what we found
if element_strings:
last_elem = element_strings[-1]
last_complete_pos = elements_section.rfind('}')
# Check if there's content after the last complete element
if last_complete_pos >= 0:
remaining = elements_section[last_complete_pos+1:].strip()
remaining_clean = remaining.lstrip(',').strip().lstrip(']').strip()
# Case 1: Incomplete element after last complete one
if remaining_clean and not remaining_clean.startswith(']'):
incomplete_start = last_complete_pos + 1
while incomplete_start < len(elements_section) and elements_section[incomplete_start] in ' \n\t\r,':
incomplete_start += 1
if incomplete_start < len(elements_section):
incomplete_elem_str = elements_section[incomplete_start:].strip()
incomplete_elem_str = incomplete_elem_str.rstrip(']').rstrip('}').rstrip()
cut_off_element = incomplete_elem_str
element_before_cutoff = element_strings[-1]
# Case 2: Last element itself is incomplete (cut off in nested structure like rows, items, etc.)
else:
# Check if JSON is incomplete by analyzing structure
# Count unclosed brackets/braces in elements section (ignoring strings)
elements_section_braces = 0
elements_section_brackets = 0
in_str = False
esc = False
for char in elements_section:
if esc:
esc = False
continue
if char == '\\':
esc = True
continue
if char == '"':
in_str = not in_str
continue
if not in_str:
if char == '{':
elements_section_braces += 1
elif char == '}':
elements_section_braces -= 1
elif char == '[':
elements_section_brackets += 1
elif char == ']':
elements_section_brackets -= 1
# Also check raw JSON for unclosed structures
raw_braces = 0
raw_brackets = 0
in_str = False
esc = False
for char in raw_json:
if esc:
esc = False
continue
if char == '\\':
esc = True
continue
if char == '"':
in_str = not in_str
continue
if not in_str:
if char == '{':
raw_braces += 1
elif char == '}':
raw_braces -= 1
elif char == '[':
raw_brackets += 1
elif char == ']':
raw_brackets -= 1
# Check if last element can be parsed
last_elem_parsable = False
try:
json.loads(last_elem)
last_elem_parsable = True
except:
pass
# Determine if last element is incomplete
is_incomplete = False
# If there are unclosed structures, element is incomplete
if elements_section_brackets > 0 or elements_section_braces > 0 or raw_brackets > 0 or raw_braces > 0:
is_incomplete = True
# If element cannot be parsed, it's incomplete
elif not last_elem_parsable:
is_incomplete = True
# Check if JSON ends mid-element by finding where element ends in raw JSON
elif last_elem_parsable:
# Find where this element ends in the raw JSON
elem_end_marker = last_elem[-100:] if len(last_elem) > 100 else last_elem
elem_end_in_raw = raw_json.rfind(elem_end_marker)
if elem_end_in_raw >= 0:
actual_elem_end = elem_end_in_raw + len(last_elem)
if actual_elem_end < len(raw_json):
remaining_after_elem = raw_json[actual_elem_end:].strip()
remaining_clean = remaining_after_elem.lstrip(',').strip()
# If there's unexpected content, element is incomplete
if remaining_clean and not remaining_clean.startswith(']'):
is_incomplete = True
if is_incomplete:
cut_off_element = last_elem
if len(element_strings) >= 2:
element_before_cutoff = element_strings[-2]
elif len(element_strings) == 1:
element_before_cutoff = last_elem
# Case 3: No complete elements found, but there's an incomplete one
elif elem_start is not None:
# There's an incomplete element that hasn't been closed
incomplete_elem_str = elements_section[elem_start:].strip()
cut_off_element = incomplete_elem_str
# No element before (this is the first/only element)
element_before_cutoff = None
except Exception as e:
logger.debug(f"Error extracting cut-off elements from raw JSON: {e}")
return cut_off_element, element_before_cutoff
def parseJsonWithModel(jsonString: str, modelClass: Type[T]) -> T:
"""
Parse JSON string using Pydantic model with error handling.
Uses existing jsonUtils methods:
- extractJsonString() - Extracts JSON from text with code fences
- tryParseJson() - Safe parsing with error handling
- repairBrokenJson() - Repairs broken/incomplete JSON
Args:
jsonString: JSON string to parse (may contain code fences, extra text, etc.)
modelClass: Pydantic model class to parse into
Returns:
Parsed Pydantic model instance
Raises:
ValueError: If JSON cannot be parsed or validated
"""
if not jsonString:
raise ValueError(f"Cannot parse empty JSON string for {modelClass.__name__}")
# Step 1: Extract JSON string (handles code fences, extra text)
extractedJson = extractJsonString(jsonString)
if not extractedJson or extractedJson.strip() == "":
raise ValueError(f"No JSON found in string for {modelClass.__name__}")
# Step 2: Try to parse as JSON
parsedJson, error, cleaned = tryParseJson(extractedJson)
if error is None and parsedJson is not None:
# Successfully parsed - try to create model
try:
if isinstance(parsedJson, dict):
return modelClass(**parsedJson)
elif isinstance(parsedJson, list):
# If model expects a list, try to parse first item
if parsedJson:
return modelClass(**parsedJson[0])
else:
raise ValueError(f"Empty list cannot be parsed as {modelClass.__name__}")
else:
raise ValueError(f"Parsed JSON is not a dict or list: {type(parsedJson)}")
except ValidationError as e:
logger.error(f"Validation error parsing {modelClass.__name__}: {e}")
raise ValueError(f"Invalid data for {modelClass.__name__}: {e}")
except Exception as e:
logger.error(f"Error creating {modelClass.__name__} instance: {e}")
raise ValueError(f"Failed to create {modelClass.__name__} instance: {e}")
# Step 3: Try to repair broken JSON
logger.warning(f"Initial JSON parsing failed, attempting repair for {modelClass.__name__}")
repairedJson = repairBrokenJson(extractedJson)
if repairedJson:
# Try parsing repaired JSON
parsedRepaired, errorRepaired, _ = tryParseJson(json.dumps(repairedJson))
if errorRepaired is None and parsedRepaired is not None:
try:
if isinstance(parsedRepaired, dict):
return modelClass(**parsedRepaired)
elif isinstance(parsedRepaired, list) and parsedRepaired:
return modelClass(**parsedRepaired[0])
except ValidationError as e:
logger.error(f"Validation error parsing repaired {modelClass.__name__}: {e}")
raise ValueError(f"Invalid repaired data for {modelClass.__name__}: {e}")
except Exception as e:
logger.error(f"Error creating {modelClass.__name__} from repaired JSON: {e}")
# Step 4: All parsing failed
logger.error(f"Failed to parse JSON for {modelClass.__name__}. Cleaned JSON preview: {cleaned[:200]}...")
raise ValueError(f"Failed to parse or validate JSON for {modelClass.__name__}. JSON may be malformed or incomplete.")