gateway/modules/shared/jsonUtils.py

964 lines
41 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
import json
import logging
import re
from typing import Any, Dict, List, Optional, Tuple, Union, Type, TypeVar
from pydantic import BaseModel, ValidationError
from modules.datamodels.datamodelAi import ContinuationContext
logger = logging.getLogger(__name__)
T = TypeVar('T', bound=BaseModel)
def stripCodeFences(text: str) -> str:
"""Remove ```json / ``` fences and surrounding whitespace if present.
Also removes [SOURCE: ...] and [END SOURCE] tags that may wrap the JSON."""
if not text:
return text
s = text.strip()
# Remove [SOURCE: ...] tags at the beginning
if s.startswith("[SOURCE:"):
# Find the end of the SOURCE tag (newline or end of string)
end_pos = s.find("\n")
if end_pos != -1:
s = s[end_pos+1:]
else:
# No newline, entire string is SOURCE tag
return ""
# Remove [END SOURCE] tags at the end
if s.endswith("[END SOURCE]"):
# Find the start of END SOURCE tag (newline before it)
start_pos = s.rfind("\n[END SOURCE]")
if start_pos != -1:
s = s[:start_pos]
else:
# No newline, entire string is END SOURCE tag
return ""
# Handle opening fence (may or may not have closing fence)
if s.startswith("```"):
# Remove first triple backticks
# Commonly starts with ```json\n
i = 3
# Skip optional language tag like 'json'
while i < len(s) and s[i] != '\n':
i += 1
if i < len(s) and s[i] == '\n':
s = s[i+1:]
# Strip trailing ``` if present
if s.endswith("```"):
s = s[:-3]
return s.strip()
return s
def extractFirstBalancedJson(text: str) -> str:
"""Return the first balanced JSON object/array substring; otherwise return trimmed input."""
if not text:
return text
s = text.strip()
# Find first '{' or '['
brace = s.find('{')
bracket = s.find('[')
start = -1
if brace != -1 and (bracket == -1 or brace < bracket):
start = brace
elif bracket != -1:
start = bracket
if start == -1:
return s
# Scan for matching close using a simple stack
stack: List[str] = []
for i in range(start, len(s)):
ch = s[i]
if ch in '{[':
stack.append(ch)
elif ch in '}]':
if not stack:
continue
opener = stack.pop()
if (opener == '{' and ch != '}') or (opener == '[' and ch != ']'):
continue
if not stack:
return s[start:i+1].strip()
return s
def normalizeJsonText(text: str) -> str:
"""Light normalization: remove BOM, normalize smart quotes."""
if not text:
return text
s = text
# Remove UTF-8 BOM if present
if s.startswith('\ufeff'):
s = s.lstrip('\ufeff')
# Normalize smart quotes to straight quotes
s = s.replace('', '"').replace('', '"').replace('', "'").replace('', "'")
return s
def extractJsonString(text: str) -> str:
"""Strip code fences, normalize, then extract first balanced JSON substring."""
s = normalizeJsonText(text)
s = stripCodeFences(s)
s = extractFirstBalancedJson(s)
return s.strip()
def tryParseJson(text: Union[str, bytes]) -> Tuple[Optional[Union[Dict, List]], Optional[Exception], str]:
"""Extract and parse JSON; return (obj, error, cleaned_str)."""
if isinstance(text, bytes):
try:
text = text.decode('utf-8', errors='replace')
except Exception:
text = str(text)
cleaned = extractJsonString(text or "")
try:
return json.loads(cleaned), None, cleaned
except Exception as e:
return None, e, cleaned
def repairBrokenJson(text: str) -> Optional[Dict[str, Any]]:
"""
Attempt to repair broken JSON using multiple strategies.
Generic solution that works for any content type.
Returns the best repair attempt or None if all fail.
IMPORTANT: This function tries to preserve ALL data by avoiding truncation.
Only uses truncation as a last resort when structure closing fails.
"""
if not text:
return None
# Strategy 1: Structure closing - close incomplete structures WITHOUT truncating
# This preserves all data and should be tried first
closedStr = closeJsonStructures(text)
obj, err, _ = tryParseJson(closedStr)
if err is None and isinstance(obj, dict):
sections = extractSectionsFromDocument(obj)
if sections:
logger.info(f"Repaired JSON using structure closing (preserved all data, found {len(sections)} sections)")
return obj
else:
# Structure closing worked but no sections found - still return it
logger.info("Repaired JSON using structure closing (preserved all data, but no sections found)")
return obj
# Strategy 2: Try to extract sections from the entire text using regex
# This handles cases where the JSON structure is broken but content is intact
# NOTE: _extractSectionsRegex may truncate, but we try it before progressive parsing
extractedSections = _extractSectionsRegex(text)
if extractedSections:
logger.info(f"Extracted {len(extractedSections)} sections using regex")
return {
"metadata": {
"split_strategy": "single_document",
"source_documents": [],
"extraction_method": "ai_generation"
},
"documents": [{"sections": extractedSections}]
}
# Strategy 3: Progressive parsing - try to find longest valid prefix (TRUNCATES DATA)
# WARNING: This strategy truncates the input and loses data after the truncation point
# Only use as last resort when other strategies fail
logger.warning("Structure closing and regex extraction failed, trying progressive parsing (WILL TRUNCATE DATA)")
bestResult = None
bestValidLength = 0
# Try different step sizes to find the best valid JSON
for stepSize in [100, 50, 10, 1]:
for i in range(len(text), 0, -stepSize):
testStr = text[:i]
closedStr = closeJsonStructures(testStr)
obj, err, _ = tryParseJson(closedStr)
if err is None and isinstance(obj, dict):
bestResult = obj
bestValidLength = i
logger.debug(f"Progressive parsing success at length {i} (step: {stepSize}) - DATA TRUNCATED AT POSITION {i}")
break
if bestResult:
break
if bestResult:
logger.warning(f"Repaired JSON using progressive parsing (valid length: {bestValidLength}, DATA LOST AFTER THIS POINT)")
# Check if we have sections in the result
sections = extractSectionsFromDocument(bestResult)
if sections:
logger.info(f"Progressive parsing found {len(sections)} sections")
return bestResult
else:
# No sections found in progressive parsing, try to extract from broken part
logger.info("Progressive parsing found no sections, trying to extract from broken part")
extractedSections = _extractSectionsRegex(text[bestValidLength:])
if extractedSections:
logger.info(f"Extracted {len(extractedSections)} sections from broken part")
# Merge with the valid part
if "documents" not in bestResult:
bestResult["documents"] = []
if not bestResult["documents"]:
bestResult["documents"] = [{"sections": []}]
bestResult["documents"][0]["sections"].extend(extractedSections)
return bestResult
logger.warning("All repair strategies failed")
return None
def closeJsonStructures(text: str) -> str:
"""
Close incomplete JSON structures generically and correctly.
Generic approach:
1. Close unterminated strings (if odd number of quotes)
2. Track structure opening order with stack (LIFO)
3. Close structures in reverse order (last opened, first closed)
4. Remove trailing commas only directly before closing brackets/braces
"""
if not text:
return text
result = text
# Step 1: Close unterminated strings
# Simple: if odd number of quotes, find last unescaped quote and close it
quoteCount = result.count('"')
if quoteCount % 2 == 1:
# Find last unescaped quote
i = len(result) - 1
while i >= 0:
if result[i] == '"':
# Count backslashes before quote
escapeCount = 0
j = i - 1
while j >= 0 and result[j] == '\\':
escapeCount += 1
j -= 1
# If even number of backslashes, quote is not escaped
if escapeCount % 2 == 0:
result += '"'
break
i -= 1
# Step 2: Track structure opening order with stack
stack = []
inString = False
escapeNext = False
for char in result:
if escapeNext:
escapeNext = False
continue
if char == '\\':
escapeNext = True
continue
if char == '"':
inString = not inString
continue
# Only track braces/brackets outside of strings
if not inString:
if char == '{':
stack.append('}')
elif char == '[':
stack.append(']')
elif char == '}' or char == ']':
# Pop matching closing bracket/brace from stack
if stack and stack[-1] == char:
stack.pop()
# Step 3: Close remaining structures in reverse order (LIFO)
# Remove trailing comma ONLY directly before each closing bracket/brace
while stack:
closingChar = stack.pop()
result = result.rstrip()
# Remove trailing comma if present (invalid before closing)
if result and result[-1] == ',':
result = result[:-1].rstrip()
result += closingChar
return result
def _extractSectionsRegex(text: str) -> List[Dict[str, Any]]:
"""
Extract sections from broken/incomplete JSON using structural parsing.
ROBUST APPROACH: Uses JSON repair and parsing instead of fragile regex patterns.
Works for any content type, nested structures, and incomplete JSON.
NOTE: This function is called FROM repairBrokenJson, so it must NOT call repairBrokenJson
to avoid circular dependency. Instead, it implements its own repair strategies.
IMPORTANT: Tries to preserve data by using structure closing first before truncation.
"""
sections = []
# Strategy 1: Try structure closing WITHOUT truncation first (preserves all data)
closed_str = closeJsonStructures(text)
obj, err, _ = tryParseJson(closed_str)
if err is None and isinstance(obj, dict):
extracted_sections = extractSectionsFromDocument(obj)
if extracted_sections:
logger.debug(f"_extractSectionsRegex: Extracted {len(extracted_sections)} sections using structure closing (preserved all data)")
return extracted_sections
# Strategy 2: Try progressive parsing to find longest valid JSON prefix (TRUNCATES DATA)
# WARNING: This truncates the input and loses data
# Only use if structure closing failed
logger.debug("_extractSectionsRegex: Structure closing failed, trying progressive parsing (WILL TRUNCATE)")
best_result = None
best_valid_length = 0
for step_size in [1000, 500, 100, 50, 10]:
for i in range(len(text), 0, -step_size):
test_str = text[:i]
closed_str = closeJsonStructures(test_str)
obj, err, _ = tryParseJson(closed_str)
if err is None and isinstance(obj, dict):
extracted_sections = extractSectionsFromDocument(obj)
if extracted_sections:
logger.debug(f"_extractSectionsRegex: Extracted {len(extracted_sections)} sections using progressive parsing at length {i} (DATA TRUNCATED)")
return extracted_sections
# Store best result even if no sections found
if not best_result:
best_result = obj
best_valid_length = i
# Strategy 2: Try to find balanced JSON and parse it
balanced_json_str = extractFirstBalancedJson(text)
if balanced_json_str and balanced_json_str != text.strip():
obj, err, _ = tryParseJson(balanced_json_str)
if err is None and isinstance(obj, dict):
extracted_sections = extractSectionsFromDocument(obj)
if extracted_sections:
logger.debug(f"_extractSectionsRegex: Extracted {len(extracted_sections)} sections from balanced JSON")
return extracted_sections
# Strategy 3: If we found a valid JSON object but no sections, try to extract sections from it
if best_result:
extracted_sections = extractSectionsFromDocument(best_result)
if extracted_sections:
logger.debug(f"_extractSectionsRegex: Extracted {len(extracted_sections)} sections from best result")
return extracted_sections
# Strategy 4: Last resort - try generic content extraction (only if nothing else worked)
logger.debug(f"_extractSectionsRegex: All structural parsing failed, trying generic content extraction")
sections = _extractGenericContent(text)
if sections:
logger.debug(f"_extractSectionsRegex: Generic content extraction found {len(sections)} sections")
return sections
def _removeLastIncompleteItem(items: List[str], original_text: str) -> List[str]:
"""
Remove the last item if it appears to be incomplete/corrupted.
This prevents corrupted data from being included in the final result.
"""
# re is already imported at module level
if not items:
return items
# Check if the original text ends with incomplete JSON patterns
# Look for patterns that suggest the last item was cut off
# Pattern 1: Text ends with incomplete string like {"text": "36
if re.search(r'\{"[^"]*"\s*:\s*"[^"]*$', original_text):
logger.debug("Detected incomplete string at end - removing last item")
return items[:-1]
# Pattern 2: Text ends with incomplete boolean like {"bool_flag": tr
if re.search(r'\{"[^"]*"\s*:\s*(true|false|tr|fa)$', original_text):
logger.debug("Detected incomplete boolean at end - removing last item")
return items[:-1]
# Pattern 3: Text ends with incomplete number like {"number": 123
if re.search(r'\{"[^"]*"\s*:\s*\d+$', original_text):
logger.debug("Detected incomplete number at end - removing last item")
return items[:-1]
# Pattern 4: Text ends with incomplete array like {"array": [1,2,3
if re.search(r'\{"[^"]*"\s*:\s*\[[^\]]*$', original_text):
logger.debug("Detected incomplete array at end - removing last item")
return items[:-1]
# Pattern 5: Text ends with incomplete object like {"obj": {"key": "val
if re.search(r'\{"[^"]*"\s*:\s*\{[^}]*$', original_text):
logger.debug("Detected incomplete object at end - removing last item")
return items[:-1]
# Pattern 6: Text ends with trailing comma (common sign of incomplete JSON)
if original_text.rstrip().endswith(','):
logger.debug("Detected trailing comma - removing last item")
return items[:-1]
# If no incomplete patterns detected, return all items
return items
def _extractGenericContent(text: str) -> List[Dict[str, Any]]:
"""
Extract generic content when no specific section patterns are found.
This handles cases where the JSON structure is completely broken.
Handles incomplete strings and corrupted data.
Excludes the last incomplete item to prevent corrupted data.
CRITICAL: Must preserve original content_type and id from the JSON structure!
"""
# re is already imported at module level
sections = []
# CRITICAL: First, try to extract the original section structure from the JSON
# Look for section patterns with content_type and id preserved
# Handle both complete and incomplete JSON (may be cut off mid-string)
# More flexible pattern that handles incomplete structures
section_pattern = r'"sections"\s*:\s*\[\s*\{[^}]*?"id"\s*:\s*"([^"]+)"[^}]*?"content_type"\s*:\s*"([^"]+)"[^}]*?"elements"\s*:\s*\[(.*?)(?:\]\s*\}|$)'
section_matches = re.finditer(section_pattern, text, re.DOTALL)
for match in section_matches:
section_id = match.group(1)
content_type = match.group(2)
elements_str = match.group(3)
# Extract elements based on content_type
elements = []
if content_type == "code_block":
# Look for {"code": "..."} patterns (complete)
code_pattern = r'\{"code"\s*:\s*"([^"]*)"(?:\s*,\s*"language"\s*:\s*"([^"]*)")?\}'
code_matches = re.finditer(code_pattern, elements_str, re.DOTALL)
for code_match in code_matches:
code = code_match.group(1)
language = code_match.group(2) if code_match.lastindex >= 2 else None
elem = {"code": code}
if language:
elem["language"] = language
elements.append(elem)
# Also look for incomplete code blocks (cut off mid-string)
# Pattern: {"code": "..." where string is not closed
incomplete_code_pattern = r'\{"code"\s*:\s*"([^"]*?)(?:"|$)'
incomplete_matches = re.finditer(incomplete_code_pattern, elements_str, re.DOTALL)
for inc_match in incomplete_matches:
code = inc_match.group(1)
# Check if this code is already in elements (from complete match)
if code and code not in [e.get("code", "")[:len(code)] for e in elements]:
# Extract language if present before the cut-off
language_match = re.search(r'"language"\s*:\s*"([^"]+)"', elements_str[:inc_match.end()])
language = language_match.group(1) if language_match else None
elem = {"code": code}
if language:
elem["language"] = language
elements.append(elem)
# If still no elements found, try to extract code from the raw elements string
# This handles cases where the JSON is very broken
if not elements:
# Look for any "code": "..." pattern, even if incomplete
raw_code_pattern = r'"code"\s*:\s*"([^"]*)"'
raw_code_matches = re.finditer(raw_code_pattern, elements_str, re.DOTALL)
for raw_match in raw_code_matches:
code = raw_match.group(1)
if code:
elements.append({"code": code})
# If still nothing, try to find incomplete code string
if not elements:
incomplete_raw_pattern = r'"code"\s*:\s*"([^"]*?)(?:"|$)'
incomplete_raw_matches = re.finditer(incomplete_raw_pattern, elements_str, re.DOTALL)
for inc_raw_match in incomplete_raw_matches:
code = inc_raw_match.group(1)
if code:
elements.append({"code": code})
elif content_type == "table":
# Look for table elements with rows (handle incomplete JSON)
# Pattern: {"headers": [...], "rows": [...]} or incomplete version
# More flexible pattern that handles incomplete rows array
# Match even if rows array is not closed
table_pattern = r'\{\s*"headers"\s*:\s*\[([^\]]*)\]\s*,\s*"rows"\s*:\s*\[(.*?)(?:\]\s*\}|$)'
table_matches = re.finditer(table_pattern, elements_str, re.DOTALL)
for table_match in table_matches:
headers_str = table_match.group(1)
rows_str = table_match.group(2)
# Parse headers
headers = [h.strip('"') for h in re.findall(r'"([^"]*)"', headers_str)]
# Parse rows (may be incomplete - handle cut-off)
rows = []
# Find all complete row arrays: ["...", "..."]
row_pattern = r'\[([^\]]*)\]'
row_matches = list(re.finditer(row_pattern, rows_str))
for row_match in row_matches:
row_str = row_match.group(1)
row = [cell.strip('"') for cell in re.findall(r'"([^"]*)"', row_str)]
if row:
rows.append(row)
# Also check for incomplete last row (cut off mid-row)
# Look for pattern like ["cell1", "cell2", "incomplete
# Find the last occurrence of [ that doesn't have a matching ]
if rows_str:
# Find all [ positions
open_brackets = [i for i, char in enumerate(rows_str) if char == '[']
close_brackets = [i for i, char in enumerate(rows_str) if char == ']']
# If there are more [ than ], we have an incomplete row
if len(open_brackets) > len(close_brackets):
# Find the last [ that doesn't have a matching ]
last_open = open_brackets[len(close_brackets)]
incomplete_row_str = rows_str[last_open+1:] # Skip the [
# Extract cells from incomplete row
incomplete_row = [cell.strip('"') for cell in re.findall(r'"([^"]*)"', incomplete_row_str)]
if incomplete_row and (not rows or incomplete_row != rows[-1]):
rows.append(incomplete_row)
elem = {"headers": headers, "rows": rows}
elements.append(elem)
elif content_type == "heading":
# Look for {"level": X, "text": "..."} patterns
heading_pattern = r'\{"level"\s*:\s*(\d+)\s*,\s*"text"\s*:\s*"([^"]*)"\}'
heading_matches = re.finditer(heading_pattern, elements_str)
for heading_match in heading_matches:
level = int(heading_match.group(1))
text = heading_match.group(2)
elements.append({"level": level, "text": text})
elif content_type in ["bullet_list", "numbered_list"]:
# Look for {"items": [...]} patterns (handle incomplete JSON)
# Pattern: {"items": [...]} or incomplete version
# More flexible pattern that handles incomplete items array
items_pattern = r'\{\s*"items"\s*:\s*\[(.*?)(?:\]\s*\}|$)'
items_matches = re.finditer(items_pattern, elements_str, re.DOTALL)
for items_match in items_matches:
items_str = items_match.group(1)
# Extract all complete items (quoted strings)
items = [item.strip('"') for item in re.findall(r'"([^"]*)"', items_str)]
# Also check for incomplete last item (cut off mid-string)
# Find the last occurrence of " that doesn't have a matching "
if items_str:
# Count quotes - odd number means incomplete item
quote_count = items_str.count('"')
if quote_count % 2 != 0:
# There's an incomplete item at the end
# Find the last complete item and the incomplete part
last_complete_quote = items_str.rfind('"', 0, items_str.rfind('"'))
if last_complete_quote >= 0:
incomplete_part = items_str[last_complete_quote+1:]
# Extract incomplete item (everything after last complete quote)
incomplete_item = incomplete_part.split(',')[0].strip('"')
if incomplete_item and incomplete_item not in items:
items.append(incomplete_item)
if items:
elements.append({"items": items})
elif content_type == "paragraph":
# Look for {"text": "..."} patterns
text_pattern = r'\{"text"\s*:\s*"([^"]*)"\}'
text_matches = re.finditer(text_pattern, elements_str)
for text_match in text_matches:
text = text_match.group(1)
elements.append({"text": text})
if elements:
sections.append({
"id": section_id,
"content_type": content_type,
"elements": elements,
"order": len(sections)
})
# If we found sections with preserved structure, return them
if sections:
return sections
# Fallback: Original logic for when structure is completely broken
# Look for any structured content patterns
# Pattern 1: Look for code_block {"code": "..."}
code_items = re.findall(r'\{"code"\s*:\s*"([^"]*)"\}', text)
incomplete_code_items = re.findall(r'\{"code"\s*:\s*"([^"]*?)(?:\n|$)', text)
all_code_items = code_items + incomplete_code_items
unique_code_items = list(dict.fromkeys([item for item in all_code_items if item.strip()]))
if unique_code_items:
unique_code_items = _removeLastIncompleteItem(unique_code_items, text)
if unique_code_items:
# Try to find section ID and language from original JSON
section_id_match = re.search(r'"id"\s*:\s*"([^"]+)"', text)
section_id = section_id_match.group(1) if section_id_match else "section_1"
language_match = re.search(r'"language"\s*:\s*"([^"]+)"', text)
language = language_match.group(1) if language_match else None
elements = [{"code": item} for item in unique_code_items]
if language and elements:
elements[0]["language"] = language
sections.append({
"id": section_id,
"content_type": "code_block",
"elements": elements,
"order": 1
})
return sections
# Pattern 2: Look for list items {"text": "..."}, including incomplete ones
list_items = re.findall(r'\{"text"\s*:\s*"([^"]*)"\}', text)
incomplete_list_items = re.findall(r'\{"text"\s*:\s*"([^"]*?)(?:\n|$)', text)
all_list_items = list_items + incomplete_list_items
unique_list_items = list(dict.fromkeys([item for item in all_list_items if item.strip()]))
if unique_list_items:
unique_list_items = _removeLastIncompleteItem(unique_list_items, text)
if unique_list_items:
elements = [{"text": item} for item in unique_list_items]
sections.append({
"id": "section_1",
"content_type": "list",
"elements": elements,
"order": 1
})
return sections
# Pattern 3: Look for paragraph text {"text": "..."}, including incomplete ones
if re.search(r'\{"text"\s*:\s*"[^"]*\}', text):
text_items = re.findall(r'\{"text"\s*:\s*"([^"]*)"\}', text)
incomplete_text_items = re.findall(r'\{"text"\s*:\s*"([^"]*?)(?:\n|$)', text)
all_text_items = text_items + incomplete_text_items
unique_text_items = list(dict.fromkeys([item for item in all_text_items if item.strip()]))
if unique_text_items:
unique_text_items = _removeLastIncompleteItem(unique_text_items, text)
if unique_text_items:
elements = [{"text": item} for item in unique_text_items]
sections.append({
"id": "section_1",
"content_type": "paragraph",
"elements": elements,
"order": 1
})
return sections
# Pattern 4: Look for any quoted strings that might be content, including incomplete ones
if re.search(r'"([^"]{3,})"', text):
text_items = re.findall(r'"([^"]{3,})"', text)
incomplete_text_items = re.findall(r'"([^"]{3,}?)(?:\n|$)', text)
all_text_items = text_items + incomplete_text_items
content_items = [item for item in all_text_items if not item.startswith(('section_', 'doc_', 'metadata', 'split_strategy', 'source_documents', 'extraction_method', 'id', 'content_type', 'elements', 'order', 'title', 'filename'))]
if content_items:
content_items = _removeLastIncompleteItem(content_items, text)
if content_items:
elements = [{"text": item} for item in content_items[:10]]
sections.append({
"id": "section_1",
"content_type": "paragraph",
"elements": elements,
"order": 1
})
return sections
def extractSectionsFromDocument(documentData: Dict[str, Any]) -> List[Dict[str, Any]]:
"""
Extract all sections from document data structure.
Handles both flat and nested document structures.
"""
if not isinstance(documentData, dict):
return []
# Try to extract sections from documents array
if "documents" in documentData:
all_sections = []
for doc in documentData.get("documents", []):
if isinstance(doc, dict) and "sections" in doc:
sections = doc.get("sections", [])
if isinstance(sections, list):
all_sections.extend(sections)
return all_sections
# Try to extract sections directly from root
if "sections" in documentData:
sections = documentData.get("sections", [])
if isinstance(sections, list):
return sections
return []
def buildContinuationContext(
allSections: List[Dict[str, Any]],
lastRawResponse: Optional[str] = None,
useCaseId: Optional[str] = None,
templateStructure: Optional[str] = None
) -> ContinuationContext:
"""
Build context information from accumulated sections for continuation prompt.
Returns summary of delivered data and cut-off point for continuation.
Args:
allSections: List of ALL sections accumulated across ALL iterations
lastRawResponse: Raw JSON response from last iteration (can be broken/incomplete)
useCaseId: Optional use case ID to determine expected JSON structure
templateStructure: JSON structure template from initial prompt (MUST be identical)
Returns:
ContinuationContext: Pydantic model with all continuation context information
"""
section_count = len(allSections)
# Build summary of delivered data (per-section counts)
summary_lines = []
summary_lines.append("Following data has already been delivered:\n")
summary_items = [] # Collect items for truncation check
for section in allSections:
section_id = section.get("id")
# CRITICAL: If section has no ID, omit it from summary
if not section_id:
continue
content_type = section.get("content_type", "")
elements = section.get("elements", [])
if content_type == "heading":
# Collect all heading elements with level and text
heading_elements = []
if isinstance(elements, list):
for elem in elements:
if isinstance(elem, dict):
level = elem.get("level", "")
text = elem.get("text", "")
if text:
heading_elements.append(f"level {level}: {text}")
elif isinstance(elements, dict):
level = elements.get("level", "")
text = elements.get("text", "")
if text:
heading_elements.append(f"level {level}: {text}")
if heading_elements:
summary_items.append(f'- heading "{section_id}" {", ".join(heading_elements)}')
elif content_type == "paragraph":
# Count text elements
text_count = 0
if isinstance(elements, list):
for elem in elements:
if isinstance(elem, dict) and elem.get("text"):
text_count += 1
elif isinstance(elements, dict) and elements.get("text"):
text_count = 1
if text_count > 0:
summary_items.append(f'- paragraph with {text_count} text(s)')
elif content_type in ["bullet_list", "numbered_list"]:
# Count items across all elements
item_count = 0
if isinstance(elements, list):
for elem in elements:
if isinstance(elem, dict):
items = elem.get("items", [])
if isinstance(items, list):
item_count += len(items)
elif isinstance(elements, dict):
items = elements.get("items", [])
if isinstance(items, list):
item_count = len(items)
if item_count > 0:
summary_items.append(f'- bullet_list with {item_count} items')
elif content_type == "table":
# Count rows across all elements
row_count = 0
if isinstance(elements, list):
for elem in elements:
if isinstance(elem, dict):
rows = elem.get("rows", [])
if isinstance(rows, list):
row_count += len(rows)
elif isinstance(elements, dict):
rows = elements.get("rows", [])
if isinstance(rows, list):
row_count = len(rows)
if row_count > 0:
summary_items.append(f'- table "{section_id}" with {row_count} rows')
elif content_type == "code_block":
# Count code lines across all elements
line_count = 0
if isinstance(elements, list):
for elem in elements:
if isinstance(elem, dict):
code = elem.get("code", "")
if code:
lines = [l for l in code.split('\n') if l.strip()]
line_count += len(lines)
elif isinstance(elements, dict):
code = elements.get("code", "")
if code:
lines = [l for l in code.split('\n') if l.strip()]
line_count = len(lines)
if line_count > 0:
line_word = "line" if line_count == 1 else "lines"
summary_items.append(f'- code_block "{section_id}" with {line_count} code {line_word}')
# If no sections extracted but we have raw response, indicate that previous response was broken
if len(summary_items) == 0 and lastRawResponse:
summary_items.append("- Previous response was incomplete/broken JSON - please continue from where it stopped")
# CRITICAL: If summary is too long, truncate: show first 10 and last 10 items
if len(summary_items) > 20:
first_10 = summary_items[:10]
last_10 = summary_items[-10:]
summary_lines.extend(first_10)
summary_lines.append(f"... (truncated {len(summary_items) - 20} items) ...")
summary_lines.extend(last_10)
else:
summary_lines.extend(summary_items)
delivered_summary = "\n".join(summary_lines)
# Extract continuation contexts using centralized jsonContinuation module
# This is the single source of truth for handling cut-off JSON strings
last_raw_json = lastRawResponse or ""
last_complete_part = ""
incomplete_part = ""
overlap_context = ""
hierarchy_context = ""
if lastRawResponse:
try:
from modules.shared.jsonContinuation import getContexts
# Normalize JSON string
normalized = stripCodeFences(normalizeJsonText(lastRawResponse)).strip()
if normalized:
# Find first '{' or '[' to start
startIdx = -1
for i, char in enumerate(normalized):
if char in '{[':
startIdx = i
break
if startIdx >= 0:
jsonContent = normalized[startIdx:]
contexts = getContexts(jsonContent)
# Store all contexts from centralized module
last_complete_part = contexts.completePart
incomplete_part = jsonContent[len(contexts.completePart):].strip()
overlap_context = contexts.overlapContext
hierarchy_context = contexts.hierarchyContext
except Exception as e:
logger.warning(f"Error extracting JSON continuation contexts: {e}", exc_info=True)
# Return ContinuationContext Pydantic model
return ContinuationContext(
section_count=section_count,
delivered_summary=delivered_summary,
template_structure=templateStructure,
last_complete_part=last_complete_part,
incomplete_part=incomplete_part,
last_raw_json=last_raw_json,
overlap_context=overlap_context,
hierarchy_context=hierarchy_context
)
def parseJsonWithModel(jsonString: str, modelClass: Type[T]) -> T:
"""
Parse JSON string using Pydantic model with error handling.
Uses existing jsonUtils methods:
- extractJsonString() - Extracts JSON from text with code fences
- tryParseJson() - Safe parsing with error handling
- repairBrokenJson() - Repairs broken/incomplete JSON
Args:
jsonString: JSON string to parse (may contain code fences, extra text, etc.)
modelClass: Pydantic model class to parse into
Returns:
Parsed Pydantic model instance
Raises:
ValueError: If JSON cannot be parsed or validated
"""
if not jsonString:
raise ValueError(f"Cannot parse empty JSON string for {modelClass.__name__}")
# Step 1: Extract JSON string (handles code fences, extra text)
extractedJson = extractJsonString(jsonString)
if not extractedJson or extractedJson.strip() == "":
raise ValueError(f"No JSON found in string for {modelClass.__name__}")
# Step 2: Try to parse as JSON
parsedJson, error, cleaned = tryParseJson(extractedJson)
if error is None and parsedJson is not None:
# Successfully parsed - try to create model
try:
if isinstance(parsedJson, dict):
return modelClass(**parsedJson)
elif isinstance(parsedJson, list):
# If model expects a list, try to parse first item
if parsedJson:
return modelClass(**parsedJson[0])
else:
raise ValueError(f"Empty list cannot be parsed as {modelClass.__name__}")
else:
raise ValueError(f"Parsed JSON is not a dict or list: {type(parsedJson)}")
except ValidationError as e:
logger.error(f"Validation error parsing {modelClass.__name__}: {e}")
raise ValueError(f"Invalid data for {modelClass.__name__}: {e}")
except Exception as e:
logger.error(f"Error creating {modelClass.__name__} instance: {e}")
raise ValueError(f"Failed to create {modelClass.__name__} instance: {e}")
# Step 3: Try to repair broken JSON
logger.warning(f"Initial JSON parsing failed, attempting repair for {modelClass.__name__}")
repairedJson = repairBrokenJson(extractedJson)
if repairedJson:
# Try parsing repaired JSON
parsedRepaired, errorRepaired, _ = tryParseJson(json.dumps(repairedJson))
if errorRepaired is None and parsedRepaired is not None:
try:
if isinstance(parsedRepaired, dict):
return modelClass(**parsedRepaired)
elif isinstance(parsedRepaired, list) and parsedRepaired:
return modelClass(**parsedRepaired[0])
except ValidationError as e:
logger.error(f"Validation error parsing repaired {modelClass.__name__}: {e}")
raise ValueError(f"Invalid repaired data for {modelClass.__name__}: {e}")
except Exception as e:
logger.error(f"Error creating {modelClass.__name__} from repaired JSON: {e}")
# Step 4: All parsing failed
logger.error(f"Failed to parse JSON for {modelClass.__name__}. Cleaned JSON preview: {cleaned[:200]}...")
raise ValueError(f"Failed to parse or validate JSON for {modelClass.__name__}. JSON may be malformed or incomplete.")