gateway/modules/shared/jsonUtils.py

1303 lines
61 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import logging
from typing import Any, Dict, List, Optional, Tuple, Union, Type, TypeVar
from pydantic import BaseModel, ValidationError
logger = logging.getLogger(__name__)
T = TypeVar('T', bound=BaseModel)
def stripCodeFences(text: str) -> str:
"""Remove ```json / ``` fences and surrounding whitespace if present."""
if not text:
return text
s = text.strip()
# Handle opening fence (may or may not have closing fence)
if s.startswith("```"):
# Remove first triple backticks
# Commonly starts with ```json\n
i = 3
# Skip optional language tag like 'json'
while i < len(s) and s[i] != '\n':
i += 1
if i < len(s) and s[i] == '\n':
s = s[i+1:]
# Strip trailing ``` if present
if s.endswith("```"):
s = s[:-3]
return s.strip()
return s
def extractFirstBalancedJson(text: str) -> str:
"""Return the first balanced JSON object/array substring; otherwise return trimmed input."""
if not text:
return text
s = text.strip()
# Find first '{' or '['
brace = s.find('{')
bracket = s.find('[')
start = -1
if brace != -1 and (bracket == -1 or brace < bracket):
start = brace
elif bracket != -1:
start = bracket
if start == -1:
return s
# Scan for matching close using a simple stack
stack: List[str] = []
for i in range(start, len(s)):
ch = s[i]
if ch in '{[':
stack.append(ch)
elif ch in '}]':
if not stack:
continue
opener = stack.pop()
if (opener == '{' and ch != '}') or (opener == '[' and ch != ']'):
continue
if not stack:
return s[start:i+1].strip()
return s
def normalizeJsonText(text: str) -> str:
"""Light normalization: remove BOM, normalize smart quotes."""
if not text:
return text
s = text
# Remove UTF-8 BOM if present
if s.startswith('\ufeff'):
s = s.lstrip('\ufeff')
# Normalize smart quotes to straight quotes
s = s.replace('', '"').replace('', '"').replace('', "'").replace('', "'")
return s
def extractJsonString(text: str) -> str:
"""Strip code fences, normalize, then extract first balanced JSON substring."""
s = normalizeJsonText(text)
s = stripCodeFences(s)
s = extractFirstBalancedJson(s)
return s.strip()
def tryParseJson(text: Union[str, bytes]) -> Tuple[Optional[Union[Dict, List]], Optional[Exception], str]:
"""Extract and parse JSON; return (obj, error, cleaned_str)."""
if isinstance(text, bytes):
try:
text = text.decode('utf-8', errors='replace')
except Exception:
text = str(text)
cleaned = extractJsonString(text or "")
try:
return json.loads(cleaned), None, cleaned
except Exception as e:
return None, e, cleaned
def repairBrokenJson(text: str) -> Optional[Dict[str, Any]]:
"""
Attempt to repair broken JSON using multiple strategies.
Generic solution that works for any content type.
Returns the best repair attempt or None if all fail.
IMPORTANT: This function tries to preserve ALL data by avoiding truncation.
Only uses truncation as a last resort when structure closing fails.
"""
if not text:
return None
# Strategy 1: Structure closing - close incomplete structures WITHOUT truncating
# This preserves all data and should be tried first
closedStr = closeJsonStructures(text)
obj, err, _ = tryParseJson(closedStr)
if err is None and isinstance(obj, dict):
sections = extractSectionsFromDocument(obj)
if sections:
logger.info(f"Repaired JSON using structure closing (preserved all data, found {len(sections)} sections)")
return obj
else:
# Structure closing worked but no sections found - still return it
logger.info("Repaired JSON using structure closing (preserved all data, but no sections found)")
return obj
# Strategy 2: Try to extract sections from the entire text using regex
# This handles cases where the JSON structure is broken but content is intact
# NOTE: _extractSectionsRegex may truncate, but we try it before progressive parsing
extractedSections = _extractSectionsRegex(text)
if extractedSections:
logger.info(f"Extracted {len(extractedSections)} sections using regex")
return {
"metadata": {
"split_strategy": "single_document",
"source_documents": [],
"extraction_method": "ai_generation"
},
"documents": [{"sections": extractedSections}]
}
# Strategy 3: Progressive parsing - try to find longest valid prefix (TRUNCATES DATA)
# WARNING: This strategy truncates the input and loses data after the truncation point
# Only use as last resort when other strategies fail
logger.warning("Structure closing and regex extraction failed, trying progressive parsing (WILL TRUNCATE DATA)")
bestResult = None
bestValidLength = 0
# Try different step sizes to find the best valid JSON
for stepSize in [100, 50, 10, 1]:
for i in range(len(text), 0, -stepSize):
testStr = text[:i]
closedStr = closeJsonStructures(testStr)
obj, err, _ = tryParseJson(closedStr)
if err is None and isinstance(obj, dict):
bestResult = obj
bestValidLength = i
logger.debug(f"Progressive parsing success at length {i} (step: {stepSize}) - DATA TRUNCATED AT POSITION {i}")
break
if bestResult:
break
if bestResult:
logger.warning(f"Repaired JSON using progressive parsing (valid length: {bestValidLength}, DATA LOST AFTER THIS POINT)")
# Check if we have sections in the result
sections = extractSectionsFromDocument(bestResult)
if sections:
logger.info(f"Progressive parsing found {len(sections)} sections")
return bestResult
else:
# No sections found in progressive parsing, try to extract from broken part
logger.info("Progressive parsing found no sections, trying to extract from broken part")
extractedSections = _extractSectionsRegex(text[bestValidLength:])
if extractedSections:
logger.info(f"Extracted {len(extractedSections)} sections from broken part")
# Merge with the valid part
if "documents" not in bestResult:
bestResult["documents"] = []
if not bestResult["documents"]:
bestResult["documents"] = [{"sections": []}]
bestResult["documents"][0]["sections"].extend(extractedSections)
return bestResult
logger.warning("All repair strategies failed")
return None
def closeJsonStructures(text: str) -> str:
"""
Close incomplete JSON structures by adding missing closing brackets.
Also handles unterminated strings by closing them.
"""
if not text:
return text
result = text
# Handle unterminated strings: find the last unclosed string
# Look for patterns like: "value" or "value\n (unterminated)
# Simple heuristic: if we end with an unterminated string (odd number of quotes at end)
# Try to close it by finding the last opening quote and closing it
if result.strip():
# Count quotes - if odd number, we have an unterminated string
quoteCount = result.count('"')
if quoteCount % 2 == 1:
# Find the last opening quote that's not escaped
lastQuotePos = result.rfind('"')
if lastQuotePos >= 0:
# Check if it's escaped
escapeCount = 0
i = lastQuotePos - 1
while i >= 0 and result[i] == '\\':
escapeCount += 1
i -= 1
# If not escaped (even number of backslashes), close the string
if escapeCount % 2 == 0:
# Find where the string should end (before next comma, bracket, or brace)
# For now, just close it at the end
result += '"'
# Count open/close brackets and braces
openBraces = result.count('{')
closeBraces = result.count('}')
openBrackets = result.count('[')
closeBrackets = result.count(']')
# Close incomplete structures
for _ in range(openBraces - closeBraces):
result += '}'
for _ in range(openBrackets - closeBrackets):
result += ']'
return result
def _extractSectionsRegex(text: str) -> List[Dict[str, Any]]:
"""
Extract sections from broken/incomplete JSON using structural parsing.
ROBUST APPROACH: Uses JSON repair and parsing instead of fragile regex patterns.
Works for any content type, nested structures, and incomplete JSON.
NOTE: This function is called FROM repairBrokenJson, so it must NOT call repairBrokenJson
to avoid circular dependency. Instead, it implements its own repair strategies.
IMPORTANT: Tries to preserve data by using structure closing first before truncation.
"""
sections = []
# Strategy 1: Try structure closing WITHOUT truncation first (preserves all data)
closed_str = closeJsonStructures(text)
obj, err, _ = tryParseJson(closed_str)
if err is None and isinstance(obj, dict):
extracted_sections = extractSectionsFromDocument(obj)
if extracted_sections:
logger.debug(f"_extractSectionsRegex: Extracted {len(extracted_sections)} sections using structure closing (preserved all data)")
return extracted_sections
# Strategy 2: Try progressive parsing to find longest valid JSON prefix (TRUNCATES DATA)
# WARNING: This truncates the input and loses data
# Only use if structure closing failed
logger.debug("_extractSectionsRegex: Structure closing failed, trying progressive parsing (WILL TRUNCATE)")
best_result = None
best_valid_length = 0
for step_size in [1000, 500, 100, 50, 10]:
for i in range(len(text), 0, -step_size):
test_str = text[:i]
closed_str = closeJsonStructures(test_str)
obj, err, _ = tryParseJson(closed_str)
if err is None and isinstance(obj, dict):
extracted_sections = extractSectionsFromDocument(obj)
if extracted_sections:
logger.debug(f"_extractSectionsRegex: Extracted {len(extracted_sections)} sections using progressive parsing at length {i} (DATA TRUNCATED)")
return extracted_sections
# Store best result even if no sections found
if not best_result:
best_result = obj
best_valid_length = i
# Strategy 2: Try to find balanced JSON and parse it
balanced_json_str = extractFirstBalancedJson(text)
if balanced_json_str and balanced_json_str != text.strip():
obj, err, _ = tryParseJson(balanced_json_str)
if err is None and isinstance(obj, dict):
extracted_sections = extractSectionsFromDocument(obj)
if extracted_sections:
logger.debug(f"_extractSectionsRegex: Extracted {len(extracted_sections)} sections from balanced JSON")
return extracted_sections
# Strategy 3: If we found a valid JSON object but no sections, try to extract sections from it
if best_result:
extracted_sections = extractSectionsFromDocument(best_result)
if extracted_sections:
logger.debug(f"_extractSectionsRegex: Extracted {len(extracted_sections)} sections from best result")
return extracted_sections
# Strategy 4: Last resort - try generic content extraction (only if nothing else worked)
logger.debug(f"_extractSectionsRegex: All structural parsing failed, trying generic content extraction")
sections = _extractGenericContent(text)
if sections:
logger.debug(f"_extractSectionsRegex: Generic content extraction found {len(sections)} sections")
return sections
def _removeLastIncompleteItem(items: List[str], original_text: str) -> List[str]:
"""
Remove the last item if it appears to be incomplete/corrupted.
This prevents corrupted data from being included in the final result.
"""
import re
if not items:
return items
# Check if the original text ends with incomplete JSON patterns
# Look for patterns that suggest the last item was cut off
# Pattern 1: Text ends with incomplete string like {"text": "36
if re.search(r'\{"[^"]*"\s*:\s*"[^"]*$', original_text):
logger.debug("Detected incomplete string at end - removing last item")
return items[:-1]
# Pattern 2: Text ends with incomplete boolean like {"bool_flag": tr
if re.search(r'\{"[^"]*"\s*:\s*(true|false|tr|fa)$', original_text):
logger.debug("Detected incomplete boolean at end - removing last item")
return items[:-1]
# Pattern 3: Text ends with incomplete number like {"number": 123
if re.search(r'\{"[^"]*"\s*:\s*\d+$', original_text):
logger.debug("Detected incomplete number at end - removing last item")
return items[:-1]
# Pattern 4: Text ends with incomplete array like {"array": [1,2,3
if re.search(r'\{"[^"]*"\s*:\s*\[[^\]]*$', original_text):
logger.debug("Detected incomplete array at end - removing last item")
return items[:-1]
# Pattern 5: Text ends with incomplete object like {"obj": {"key": "val
if re.search(r'\{"[^"]*"\s*:\s*\{[^}]*$', original_text):
logger.debug("Detected incomplete object at end - removing last item")
return items[:-1]
# Pattern 6: Text ends with trailing comma (common sign of incomplete JSON)
if original_text.rstrip().endswith(','):
logger.debug("Detected trailing comma - removing last item")
return items[:-1]
# If no incomplete patterns detected, return all items
return items
def _extractGenericContent(text: str) -> List[Dict[str, Any]]:
"""
Extract generic content when no specific section patterns are found.
This handles cases where the JSON structure is completely broken.
Handles incomplete strings and corrupted data.
Excludes the last incomplete item to prevent corrupted data.
CRITICAL: Must preserve original content_type and id from the JSON structure!
"""
import re
sections = []
# CRITICAL: First, try to extract the original section structure from the JSON
# Look for section patterns with content_type and id preserved
# Handle both complete and incomplete JSON (may be cut off mid-string)
# More flexible pattern that handles incomplete structures
section_pattern = r'"sections"\s*:\s*\[\s*\{[^}]*?"id"\s*:\s*"([^"]+)"[^}]*?"content_type"\s*:\s*"([^"]+)"[^}]*?"elements"\s*:\s*\[(.*?)(?:\]\s*\}|$)'
section_matches = re.finditer(section_pattern, text, re.DOTALL)
for match in section_matches:
section_id = match.group(1)
content_type = match.group(2)
elements_str = match.group(3)
# Extract elements based on content_type
elements = []
if content_type == "code_block":
# Look for {"code": "..."} patterns (complete)
code_pattern = r'\{"code"\s*:\s*"([^"]*)"(?:\s*,\s*"language"\s*:\s*"([^"]*)")?\}'
code_matches = re.finditer(code_pattern, elements_str, re.DOTALL)
for code_match in code_matches:
code = code_match.group(1)
language = code_match.group(2) if code_match.lastindex >= 2 else None
elem = {"code": code}
if language:
elem["language"] = language
elements.append(elem)
# Also look for incomplete code blocks (cut off mid-string)
# Pattern: {"code": "..." where string is not closed
incomplete_code_pattern = r'\{"code"\s*:\s*"([^"]*?)(?:"|$)'
incomplete_matches = re.finditer(incomplete_code_pattern, elements_str, re.DOTALL)
for inc_match in incomplete_matches:
code = inc_match.group(1)
# Check if this code is already in elements (from complete match)
if code and code not in [e.get("code", "")[:len(code)] for e in elements]:
# Extract language if present before the cut-off
language_match = re.search(r'"language"\s*:\s*"([^"]+)"', elements_str[:inc_match.end()])
language = language_match.group(1) if language_match else None
elem = {"code": code}
if language:
elem["language"] = language
elements.append(elem)
# If still no elements found, try to extract code from the raw elements string
# This handles cases where the JSON is very broken
if not elements:
# Look for any "code": "..." pattern, even if incomplete
raw_code_pattern = r'"code"\s*:\s*"([^"]*)"'
raw_code_matches = re.finditer(raw_code_pattern, elements_str, re.DOTALL)
for raw_match in raw_code_matches:
code = raw_match.group(1)
if code:
elements.append({"code": code})
# If still nothing, try to find incomplete code string
if not elements:
incomplete_raw_pattern = r'"code"\s*:\s*"([^"]*?)(?:"|$)'
incomplete_raw_matches = re.finditer(incomplete_raw_pattern, elements_str, re.DOTALL)
for inc_raw_match in incomplete_raw_matches:
code = inc_raw_match.group(1)
if code:
elements.append({"code": code})
elif content_type == "table":
# Look for table elements with rows (handle incomplete JSON)
# Pattern: {"headers": [...], "rows": [...]} or incomplete version
# More flexible pattern that handles incomplete rows array
# Match even if rows array is not closed
table_pattern = r'\{\s*"headers"\s*:\s*\[([^\]]*)\]\s*,\s*"rows"\s*:\s*\[(.*?)(?:\]\s*\}|$)'
table_matches = re.finditer(table_pattern, elements_str, re.DOTALL)
for table_match in table_matches:
headers_str = table_match.group(1)
rows_str = table_match.group(2)
# Parse headers
headers = [h.strip('"') for h in re.findall(r'"([^"]*)"', headers_str)]
# Parse rows (may be incomplete - handle cut-off)
rows = []
# Find all complete row arrays: ["...", "..."]
row_pattern = r'\[([^\]]*)\]'
row_matches = list(re.finditer(row_pattern, rows_str))
for row_match in row_matches:
row_str = row_match.group(1)
row = [cell.strip('"') for cell in re.findall(r'"([^"]*)"', row_str)]
if row:
rows.append(row)
# Also check for incomplete last row (cut off mid-row)
# Look for pattern like ["cell1", "cell2", "incomplete
# Find the last occurrence of [ that doesn't have a matching ]
if rows_str:
# Find all [ positions
open_brackets = [i for i, char in enumerate(rows_str) if char == '[']
close_brackets = [i for i, char in enumerate(rows_str) if char == ']']
# If there are more [ than ], we have an incomplete row
if len(open_brackets) > len(close_brackets):
# Find the last [ that doesn't have a matching ]
last_open = open_brackets[len(close_brackets)]
incomplete_row_str = rows_str[last_open+1:] # Skip the [
# Extract cells from incomplete row
incomplete_row = [cell.strip('"') for cell in re.findall(r'"([^"]*)"', incomplete_row_str)]
if incomplete_row and (not rows or incomplete_row != rows[-1]):
rows.append(incomplete_row)
elem = {"headers": headers, "rows": rows}
elements.append(elem)
elif content_type == "heading":
# Look for {"level": X, "text": "..."} patterns
heading_pattern = r'\{"level"\s*:\s*(\d+)\s*,\s*"text"\s*:\s*"([^"]*)"\}'
heading_matches = re.finditer(heading_pattern, elements_str)
for heading_match in heading_matches:
level = int(heading_match.group(1))
text = heading_match.group(2)
elements.append({"level": level, "text": text})
elif content_type in ["bullet_list", "numbered_list"]:
# Look for {"items": [...]} patterns (handle incomplete JSON)
# Pattern: {"items": [...]} or incomplete version
# More flexible pattern that handles incomplete items array
items_pattern = r'\{\s*"items"\s*:\s*\[(.*?)(?:\]\s*\}|$)'
items_matches = re.finditer(items_pattern, elements_str, re.DOTALL)
for items_match in items_matches:
items_str = items_match.group(1)
# Extract all complete items (quoted strings)
items = [item.strip('"') for item in re.findall(r'"([^"]*)"', items_str)]
# Also check for incomplete last item (cut off mid-string)
# Find the last occurrence of " that doesn't have a matching "
if items_str:
# Count quotes - odd number means incomplete item
quote_count = items_str.count('"')
if quote_count % 2 != 0:
# There's an incomplete item at the end
# Find the last complete item and the incomplete part
last_complete_quote = items_str.rfind('"', 0, items_str.rfind('"'))
if last_complete_quote >= 0:
incomplete_part = items_str[last_complete_quote+1:]
# Extract incomplete item (everything after last complete quote)
incomplete_item = incomplete_part.split(',')[0].strip('"')
if incomplete_item and incomplete_item not in items:
items.append(incomplete_item)
if items:
elements.append({"items": items})
elif content_type == "paragraph":
# Look for {"text": "..."} patterns
text_pattern = r'\{"text"\s*:\s*"([^"]*)"\}'
text_matches = re.finditer(text_pattern, elements_str)
for text_match in text_matches:
text = text_match.group(1)
elements.append({"text": text})
if elements:
sections.append({
"id": section_id,
"content_type": content_type,
"elements": elements,
"order": len(sections)
})
# If we found sections with preserved structure, return them
if sections:
return sections
# Fallback: Original logic for when structure is completely broken
# Look for any structured content patterns
# Pattern 1: Look for code_block {"code": "..."}
code_items = re.findall(r'\{"code"\s*:\s*"([^"]*)"\}', text)
incomplete_code_items = re.findall(r'\{"code"\s*:\s*"([^"]*?)(?:\n|$)', text)
all_code_items = code_items + incomplete_code_items
unique_code_items = list(dict.fromkeys([item for item in all_code_items if item.strip()]))
if unique_code_items:
unique_code_items = _removeLastIncompleteItem(unique_code_items, text)
if unique_code_items:
# Try to find section ID and language from original JSON
section_id_match = re.search(r'"id"\s*:\s*"([^"]+)"', text)
section_id = section_id_match.group(1) if section_id_match else "section_1"
language_match = re.search(r'"language"\s*:\s*"([^"]+)"', text)
language = language_match.group(1) if language_match else None
elements = [{"code": item} for item in unique_code_items]
if language and elements:
elements[0]["language"] = language
sections.append({
"id": section_id,
"content_type": "code_block",
"elements": elements,
"order": 1
})
return sections
# Pattern 2: Look for list items {"text": "..."}, including incomplete ones
list_items = re.findall(r'\{"text"\s*:\s*"([^"]*)"\}', text)
incomplete_list_items = re.findall(r'\{"text"\s*:\s*"([^"]*?)(?:\n|$)', text)
all_list_items = list_items + incomplete_list_items
unique_list_items = list(dict.fromkeys([item for item in all_list_items if item.strip()]))
if unique_list_items:
unique_list_items = _removeLastIncompleteItem(unique_list_items, text)
if unique_list_items:
elements = [{"text": item} for item in unique_list_items]
sections.append({
"id": "section_1",
"content_type": "list",
"elements": elements,
"order": 1
})
return sections
# Pattern 3: Look for paragraph text {"text": "..."}, including incomplete ones
if re.search(r'\{"text"\s*:\s*"[^"]*\}', text):
text_items = re.findall(r'\{"text"\s*:\s*"([^"]*)"\}', text)
incomplete_text_items = re.findall(r'\{"text"\s*:\s*"([^"]*?)(?:\n|$)', text)
all_text_items = text_items + incomplete_text_items
unique_text_items = list(dict.fromkeys([item for item in all_text_items if item.strip()]))
if unique_text_items:
unique_text_items = _removeLastIncompleteItem(unique_text_items, text)
if unique_text_items:
elements = [{"text": item} for item in unique_text_items]
sections.append({
"id": "section_1",
"content_type": "paragraph",
"elements": elements,
"order": 1
})
return sections
# Pattern 4: Look for any quoted strings that might be content, including incomplete ones
if re.search(r'"([^"]{3,})"', text):
text_items = re.findall(r'"([^"]{3,})"', text)
incomplete_text_items = re.findall(r'"([^"]{3,}?)(?:\n|$)', text)
all_text_items = text_items + incomplete_text_items
content_items = [item for item in all_text_items if not item.startswith(('section_', 'doc_', 'metadata', 'split_strategy', 'source_documents', 'extraction_method', 'id', 'content_type', 'elements', 'order', 'title', 'filename'))]
if content_items:
content_items = _removeLastIncompleteItem(content_items, text)
if content_items:
elements = [{"text": item} for item in content_items[:10]]
sections.append({
"id": "section_1",
"content_type": "paragraph",
"elements": elements,
"order": 1
})
return sections
def extractSectionsFromDocument(documentData: Dict[str, Any]) -> List[Dict[str, Any]]:
"""
Extract all sections from document data structure.
Handles both flat and nested document structures.
"""
if not isinstance(documentData, dict):
return []
# Try to extract sections from documents array
if "documents" in documentData:
all_sections = []
for doc in documentData.get("documents", []):
if isinstance(doc, dict) and "sections" in doc:
sections = doc.get("sections", [])
if isinstance(sections, list):
all_sections.extend(sections)
return all_sections
# Try to extract sections directly from root
if "sections" in documentData:
sections = documentData.get("sections", [])
if isinstance(sections, list):
return sections
return []
def buildContinuationContext(allSections: List[Dict[str, Any]], lastRawResponse: Optional[str] = None) -> Dict[str, Any]:
"""
Build context information from accumulated sections for continuation prompt.
Returns summary of delivered data and cut-off point for continuation.
Args:
allSections: List of ALL sections accumulated across ALL iterations
lastRawResponse: Raw JSON response from last iteration (can be broken/incomplete)
Returns:
Dict with delivered_summary, cut_off_element, element_before_cutoff
"""
context = {
"section_count": len(allSections),
}
# Build summary of delivered data (per-section counts)
summary_lines = []
summary_lines.append("Following data has already been delivered:\n")
summary_items = [] # Collect items for truncation check
for section in allSections:
section_id = section.get("id")
# CRITICAL: If section has no ID, omit it from summary
if not section_id:
continue
content_type = section.get("content_type", "")
elements = section.get("elements", [])
if content_type == "heading":
# Collect all heading elements with level and text
heading_elements = []
if isinstance(elements, list):
for elem in elements:
if isinstance(elem, dict):
level = elem.get("level", "")
text = elem.get("text", "")
if text:
heading_elements.append(f"level {level}: {text}")
elif isinstance(elements, dict):
level = elements.get("level", "")
text = elements.get("text", "")
if text:
heading_elements.append(f"level {level}: {text}")
if heading_elements:
summary_items.append(f'- heading "{section_id}" {", ".join(heading_elements)}')
elif content_type == "paragraph":
# Count text elements
text_count = 0
if isinstance(elements, list):
for elem in elements:
if isinstance(elem, dict) and elem.get("text"):
text_count += 1
elif isinstance(elements, dict) and elements.get("text"):
text_count = 1
if text_count > 0:
summary_items.append(f'- paragraph with {text_count} text(s)')
elif content_type in ["bullet_list", "numbered_list"]:
# Count items across all elements
item_count = 0
if isinstance(elements, list):
for elem in elements:
if isinstance(elem, dict):
items = elem.get("items", [])
if isinstance(items, list):
item_count += len(items)
elif isinstance(elements, dict):
items = elements.get("items", [])
if isinstance(items, list):
item_count = len(items)
if item_count > 0:
summary_items.append(f'- bullet_list with {item_count} items')
elif content_type == "table":
# Count rows across all elements
row_count = 0
if isinstance(elements, list):
for elem in elements:
if isinstance(elem, dict):
rows = elem.get("rows", [])
if isinstance(rows, list):
row_count += len(rows)
elif isinstance(elements, dict):
rows = elements.get("rows", [])
if isinstance(rows, list):
row_count = len(rows)
if row_count > 0:
summary_items.append(f'- table "{section_id}" with {row_count} rows')
elif content_type == "code_block":
# Count code lines across all elements
line_count = 0
if isinstance(elements, list):
for elem in elements:
if isinstance(elem, dict):
code = elem.get("code", "")
if code:
lines = [l for l in code.split('\n') if l.strip()]
line_count += len(lines)
elif isinstance(elements, dict):
code = elements.get("code", "")
if code:
lines = [l for l in code.split('\n') if l.strip()]
line_count = len(lines)
if line_count > 0:
line_word = "line" if line_count == 1 else "lines"
summary_items.append(f'- code_block "{section_id}" with {line_count} code {line_word}')
# If no sections extracted but we have raw response, indicate that previous response was broken
if len(summary_items) == 0 and lastRawResponse:
summary_items.append("- Previous response was incomplete/broken JSON - please continue from where it stopped")
# CRITICAL: If summary is too long, truncate: show first 10 and last 10 items
if len(summary_items) > 20:
first_10 = summary_items[:10]
last_10 = summary_items[-10:]
summary_lines.extend(first_10)
summary_lines.append(f"... (truncated {len(summary_items) - 20} items) ...")
summary_lines.extend(last_10)
else:
summary_lines.extend(summary_items)
context["delivered_summary"] = "\n".join(summary_lines)
# Extract cut-off point using new algorithm
# 1. Loop over all sections until finding incomplete section
# 2. In incomplete section, loop through elements until finding cut-off element
# CRITICAL: There is always only ONE section incomplete (JSON cut-off point)
cut_off_element = None
element_before_cutoff = None
if lastRawResponse:
try:
# CRITICAL: Always try to find incomplete section from raw JSON
# Even if JSON can be parsed, it might be incomplete (cut off mid-element)
raw_stripped = stripCodeFences(lastRawResponse.strip()).strip()
# Check if response is just a fragment (not full JSON structure)
# Fragments are continuation content that should be appended to the last incomplete element
is_fragment = not (raw_stripped.strip().startswith('{') or raw_stripped.strip().startswith('['))
if is_fragment:
# Response is a fragment - it continues the last incomplete element
# Find the last incomplete element from allSections
if allSections:
last_section = allSections[-1]
elements = last_section.get("elements", [])
if isinstance(elements, list) and elements:
# Get the last element (which should be incomplete)
last_elem = elements[-1]
if isinstance(last_elem, dict):
# The fragment continues this element
# Show the fragment as cut_off_element
cut_off_element = raw_stripped
# Show the element before (if there is one)
if len(elements) > 1:
element_before_cutoff = json.dumps(elements[-2])
else:
element_before_cutoff = json.dumps(last_elem)
else:
# Response is full JSON - use standard extraction
# Strategy 1: Try to find incomplete section using structured parsing
incomplete_section = _findIncompleteSectionInRaw(raw_stripped)
if incomplete_section:
cut_off_element, element_before_cutoff = _extractCutOffElements(incomplete_section, raw_stripped)
# Strategy 2: If no incomplete section found, extract directly from raw JSON
# This handles cases where JSON is cut off mid-element within a complete section
if not cut_off_element:
cut_off_element, element_before_cutoff = _extractCutOffElementsFromRaw(raw_stripped, allSections)
except Exception as e:
logger.debug(f"Error extracting cut-off point: {e}")
context["element_before_cutoff"] = element_before_cutoff
context["cut_off_element"] = cut_off_element
# Store raw JSON response for prompt builder to check
if lastRawResponse:
context["last_raw_json"] = lastRawResponse
else:
context["last_raw_json"] = ""
return context
def _findIncompleteSectionInRaw(raw_json: str) -> Optional[Dict[str, Any]]:
"""
Find the incomplete section in raw JSON.
CRITICAL: JSON can be cut off mid-element (e.g., {"text": "20327,20)
We need to find the last section and check if it's incomplete.
"""
try:
# Try to parse documents structure
if '"documents"' in raw_json:
# Find last document
doc_start = raw_json.rfind('"documents"')
if doc_start >= 0:
doc_section = raw_json[doc_start:]
# Try to find sections array
sections_start = doc_section.find('"sections"')
if sections_start >= 0:
sections_section = doc_section[sections_start:]
# Find sections array start
array_start = sections_section.find('[')
if array_start >= 0:
# Find all complete sections
section_objects = []
depth = 0
section_start = None
for i in range(array_start, len(sections_section)):
if sections_section[i] == '{':
if depth == 0:
section_start = i
depth += 1
elif sections_section[i] == '}':
depth -= 1
if depth == 0 and section_start is not None:
# Found complete section
section_str = sections_section[section_start:i+1]
try:
section_obj = json.loads('{' + section_str + '}')
section_objects.append(section_obj)
except:
pass
section_start = None
# CRITICAL: Check if there's content after the last complete section
# If JSON ends mid-element, the last section is incomplete
if section_objects:
# Find position after last complete section
last_section_end = sections_section.rfind('}')
if last_section_end >= 0:
# Check if there's more content after the last }
remaining_after_last_section = sections_section[last_section_end+1:].strip()
# Remove closing brackets/braces that might be there
remaining_after_last_section = remaining_after_last_section.lstrip('],}')
# If there's still content (like incomplete element), section is incomplete
if remaining_after_last_section and not remaining_after_last_section.startswith(']'):
# Last section is incomplete - return it
return section_objects[-1]
# Also check: if we can't parse the full sections array, last section is incomplete
try:
# Try to parse the sections array
sections_array_str = sections_section[array_start:]
json.loads(sections_array_str)
# Parsed successfully - all sections complete
return None
except:
# Cannot parse - last section is incomplete
return section_objects[-1] if section_objects else None
except Exception as e:
logger.debug(f"Error finding incomplete section: {e}")
return None
def _extractCutOffElements(incomplete_section: Dict[str, Any], raw_json: str) -> Tuple[Optional[str], Optional[str]]:
"""Extract cut-off element and element before from incomplete section."""
cut_off_element = None
element_before_cutoff = None
elements = incomplete_section.get("elements", [])
if not elements:
return None, None
# CRITICAL: In 99% of cases, JSON is cut off mid-string or mid-number
# Deliver the cut-off part AS-IS (don't try to "complete" it)
if isinstance(elements, list):
# Find last element (might be incomplete)
if elements:
# Edge case: If cut-off is in first element, just show cut-off element
if len(elements) == 1:
# Only one element - might be cut-off
last_elem = elements[0]
if isinstance(last_elem, dict):
# Check if element contains nested content (e.g., code_block with JSON string)
cut_off_element = _extractCutOffFromElement(last_elem, raw_json)
if not cut_off_element:
cut_off_element = json.dumps(last_elem)
else:
cut_off_element = str(last_elem)
else:
# Multiple elements - last one might be cut-off, get element before
element_before_cutoff = json.dumps(elements[-2]) if isinstance(elements[-2], dict) else str(elements[-2])
last_elem = elements[-1]
if isinstance(last_elem, dict):
# Check if element contains nested content
cut_off_element = _extractCutOffFromElement(last_elem, raw_json)
if not cut_off_element:
cut_off_element = json.dumps(last_elem)
else:
cut_off_element = str(last_elem)
elif isinstance(elements, dict):
# Single element - might be cut-off
cut_off_element = _extractCutOffFromElement(elements, raw_json)
if not cut_off_element:
cut_off_element = json.dumps(elements)
# If we couldn't extract from parsed structure, extract from raw JSON
if not cut_off_element:
# Extract the last incomplete part from raw JSON
# Find the last incomplete string/number/array
import re
# Look for incomplete string at the end
incomplete_match = re.search(r'"([^"]*?)(?:"|$)', raw_json[-500:], re.DOTALL)
if incomplete_match:
cut_off_element = incomplete_match.group(1)
else:
# Look for incomplete number
number_match = re.search(r'(\d+\.?\d*)(?:\s*[,}\]]|$)', raw_json[-200:])
if number_match:
cut_off_element = number_match.group(1)
return cut_off_element, element_before_cutoff
def _extractCutOffFromElement(element: Dict[str, Any], raw_json: str) -> Optional[str]:
"""
Extract cut-off point from within an element (e.g., code_block with JSON string, table with incomplete rows).
This helps identify where exactly to continue within nested structures.
"""
import re
# Check for code_block with nested JSON
if "code" in element:
code_content = element.get("code", "")
if isinstance(code_content, str) and code_content.strip().startswith("{"):
# This is JSON inside a code string - find where it was cut off
# Look for the last complete value in the raw JSON
# Find the code string in raw JSON
code_match = re.search(r'"code"\s*:\s*"([^"]*?)(?:"|$)', raw_json[-2000:], re.DOTALL)
if code_match:
code_str = code_match.group(1)
# Try to find the last complete value in the JSON string
# Look for patterns like: [2, 3, 5, ... 17929, (cut off here)
array_match = re.search(r'\[([^\]]*?)(?:\]|$)', code_str, re.DOTALL)
if array_match:
array_content = array_match.group(1)
# Find last complete number/item
# Match: number followed by comma or end
last_complete = re.findall(r'(\d+)\s*[,]', array_content)
if last_complete:
last_num = last_complete[-1]
# Return context showing where to continue
return f'{{"code": "{{\\"primes\\": [... up to {last_num}, <CONTINUE FROM HERE>]"}}'
# Check for table with incomplete rows
if "rows" in element:
rows = element.get("rows", [])
if isinstance(rows, list) and rows:
# Find last complete row in raw JSON
rows_str = str(rows)
# Try to find where rows were cut off
last_row_match = re.search(r'\[([^\]]*?)(?:\]|$)', raw_json[-1000:], re.DOTALL)
if last_row_match:
return f'{{"rows": [... last complete row shown above, <CONTINUE FROM HERE>]}}'
# Check for list items
if "items" in element:
items = element.get("items", [])
if isinstance(items, list) and items:
# Find last complete item
last_item_match = re.search(r'"([^"]*?)"\s*(?:,|\])', raw_json[-1000:], re.DOTALL)
if last_item_match:
return f'{{"items": [... last item shown above, <CONTINUE FROM HERE>]}}'
return None
def _extractCutOffElementsFromRaw(raw_json: str, allSections: List[Dict[str, Any]]) -> Tuple[Optional[str], Optional[str]]:
"""
Extract cut-off element directly from raw JSON when section parsing fails.
This handles ALL cases where JSON is cut off:
- Mid-element (incomplete element object)
- Mid-string/number within an element
- Mid-array within an element (e.g., rows in table, items in list)
- Mid-nested structure
CRITICAL: In 99% of cases, JSON is cut off mid-string or mid-number - deliver as-is.
"""
cut_off_element = None
element_before_cutoff = None
try:
# Find the last "elements" array in raw JSON
if '"elements"' in raw_json:
# Find the last occurrence of "elements"
last_elements_pos = raw_json.rfind('"elements"')
if last_elements_pos >= 0:
elements_section = raw_json[last_elements_pos:]
# Find the array start '['
array_start = elements_section.find('[')
if array_start >= 0:
# Use a simpler approach: find all element objects by tracking braces
# This works even if elements contain nested arrays/objects
element_strings = []
depth = 0
in_string = False
escape_next = False
elem_start = None
for i in range(array_start, len(elements_section)):
char = elements_section[i]
# Track string state (ignore brackets/braces inside strings)
if escape_next:
escape_next = False
continue
if char == '\\':
escape_next = True
continue
if char == '"' and not escape_next:
in_string = not in_string
continue
if not in_string:
if char == '{':
if depth == 0:
elem_start = i
depth += 1
elif char == '}':
depth -= 1
if depth == 0 and elem_start is not None:
# Found complete element (all braces closed, even if nested arrays are incomplete)
elem_str = elements_section[elem_start:i+1]
element_strings.append(elem_str)
elem_start = None
# Now analyze what we found
if element_strings:
last_elem = element_strings[-1]
last_complete_pos = elements_section.rfind('}')
# Check if there's content after the last complete element
if last_complete_pos >= 0:
remaining = elements_section[last_complete_pos+1:].strip()
remaining_clean = remaining.lstrip(',').strip().lstrip(']').strip()
# Case 1: Incomplete element after last complete one
if remaining_clean and not remaining_clean.startswith(']'):
incomplete_start = last_complete_pos + 1
while incomplete_start < len(elements_section) and elements_section[incomplete_start] in ' \n\t\r,':
incomplete_start += 1
if incomplete_start < len(elements_section):
incomplete_elem_str = elements_section[incomplete_start:].strip()
incomplete_elem_str = incomplete_elem_str.rstrip(']').rstrip('}').rstrip()
cut_off_element = incomplete_elem_str
element_before_cutoff = element_strings[-1]
# Case 2: Last element itself is incomplete (cut off in nested structure like rows, items, etc.)
else:
# Check if JSON is incomplete by analyzing structure
# Count unclosed brackets/braces in elements section (ignoring strings)
elements_section_braces = 0
elements_section_brackets = 0
in_str = False
esc = False
for char in elements_section:
if esc:
esc = False
continue
if char == '\\':
esc = True
continue
if char == '"':
in_str = not in_str
continue
if not in_str:
if char == '{':
elements_section_braces += 1
elif char == '}':
elements_section_braces -= 1
elif char == '[':
elements_section_brackets += 1
elif char == ']':
elements_section_brackets -= 1
# Also check raw JSON for unclosed structures
raw_braces = 0
raw_brackets = 0
in_str = False
esc = False
for char in raw_json:
if esc:
esc = False
continue
if char == '\\':
esc = True
continue
if char == '"':
in_str = not in_str
continue
if not in_str:
if char == '{':
raw_braces += 1
elif char == '}':
raw_braces -= 1
elif char == '[':
raw_brackets += 1
elif char == ']':
raw_brackets -= 1
# Check if last element can be parsed
last_elem_parsable = False
try:
json.loads(last_elem)
last_elem_parsable = True
except:
pass
# Determine if last element is incomplete
is_incomplete = False
# If there are unclosed structures, element is incomplete
if elements_section_brackets > 0 or elements_section_braces > 0 or raw_brackets > 0 or raw_braces > 0:
is_incomplete = True
# If element cannot be parsed, it's incomplete
elif not last_elem_parsable:
is_incomplete = True
# Check if JSON ends mid-element by finding where element ends in raw JSON
elif last_elem_parsable:
# Find where this element ends in the raw JSON
elem_end_marker = last_elem[-100:] if len(last_elem) > 100 else last_elem
elem_end_in_raw = raw_json.rfind(elem_end_marker)
if elem_end_in_raw >= 0:
actual_elem_end = elem_end_in_raw + len(last_elem)
if actual_elem_end < len(raw_json):
remaining_after_elem = raw_json[actual_elem_end:].strip()
remaining_clean = remaining_after_elem.lstrip(',').strip()
# If there's unexpected content, element is incomplete
if remaining_clean and not remaining_clean.startswith(']'):
is_incomplete = True
if is_incomplete:
cut_off_element = last_elem
if len(element_strings) >= 2:
element_before_cutoff = element_strings[-2]
elif len(element_strings) == 1:
element_before_cutoff = last_elem
# Case 3: No complete elements found, but there's an incomplete one
elif elem_start is not None:
# There's an incomplete element that hasn't been closed
incomplete_elem_str = elements_section[elem_start:].strip()
cut_off_element = incomplete_elem_str
# No element before (this is the first/only element)
element_before_cutoff = None
except Exception as e:
logger.debug(f"Error extracting cut-off elements from raw JSON: {e}")
return cut_off_element, element_before_cutoff
def parseJsonWithModel(jsonString: str, modelClass: Type[T]) -> T:
"""
Parse JSON string using Pydantic model with error handling.
Uses existing jsonUtils methods:
- extractJsonString() - Extracts JSON from text with code fences
- tryParseJson() - Safe parsing with error handling
- repairBrokenJson() - Repairs broken/incomplete JSON
Args:
jsonString: JSON string to parse (may contain code fences, extra text, etc.)
modelClass: Pydantic model class to parse into
Returns:
Parsed Pydantic model instance
Raises:
ValueError: If JSON cannot be parsed or validated
"""
if not jsonString:
raise ValueError(f"Cannot parse empty JSON string for {modelClass.__name__}")
# Step 1: Extract JSON string (handles code fences, extra text)
extractedJson = extractJsonString(jsonString)
if not extractedJson or extractedJson.strip() == "":
raise ValueError(f"No JSON found in string for {modelClass.__name__}")
# Step 2: Try to parse as JSON
parsedJson, error, cleaned = tryParseJson(extractedJson)
if error is None and parsedJson is not None:
# Successfully parsed - try to create model
try:
if isinstance(parsedJson, dict):
return modelClass(**parsedJson)
elif isinstance(parsedJson, list):
# If model expects a list, try to parse first item
if parsedJson:
return modelClass(**parsedJson[0])
else:
raise ValueError(f"Empty list cannot be parsed as {modelClass.__name__}")
else:
raise ValueError(f"Parsed JSON is not a dict or list: {type(parsedJson)}")
except ValidationError as e:
logger.error(f"Validation error parsing {modelClass.__name__}: {e}")
raise ValueError(f"Invalid data for {modelClass.__name__}: {e}")
except Exception as e:
logger.error(f"Error creating {modelClass.__name__} instance: {e}")
raise ValueError(f"Failed to create {modelClass.__name__} instance: {e}")
# Step 3: Try to repair broken JSON
logger.warning(f"Initial JSON parsing failed, attempting repair for {modelClass.__name__}")
repairedJson = repairBrokenJson(extractedJson)
if repairedJson:
# Try parsing repaired JSON
parsedRepaired, errorRepaired, _ = tryParseJson(json.dumps(repairedJson))
if errorRepaired is None and parsedRepaired is not None:
try:
if isinstance(parsedRepaired, dict):
return modelClass(**parsedRepaired)
elif isinstance(parsedRepaired, list) and parsedRepaired:
return modelClass(**parsedRepaired[0])
except ValidationError as e:
logger.error(f"Validation error parsing repaired {modelClass.__name__}: {e}")
raise ValueError(f"Invalid repaired data for {modelClass.__name__}: {e}")
except Exception as e:
logger.error(f"Error creating {modelClass.__name__} from repaired JSON: {e}")
# Step 4: All parsing failed
logger.error(f"Failed to parse JSON for {modelClass.__name__}. Cleaned JSON preview: {cleaned[:200]}...")
raise ValueError(f"Failed to parse or validate JSON for {modelClass.__name__}. JSON may be malformed or incomplete.")