gateway/modules/shared/jsonUtils.py
2025-11-19 23:51:25 +01:00

1098 lines
46 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import logging
from typing import Any, Dict, List, Optional, Tuple, Union, Type, TypeVar
from pydantic import BaseModel, ValidationError
logger = logging.getLogger(__name__)
T = TypeVar('T', bound=BaseModel)
def stripCodeFences(text: str) -> str:
"""Remove ```json / ``` fences and surrounding whitespace if present."""
if not text:
return text
s = text.strip()
if s.startswith("```") and s.endswith("```"):
# Remove first/last triple backticks
# Commonly starts with ```json\n
# Strip opening backticks
i = 3
# Skip optional language tag like 'json'
while i < len(s) and s[i] != '\n':
i += 1
if i < len(s) and s[i] == '\n':
s = s[i+1:]
# Strip trailing ```
if s.endswith("```"):
s = s[:-3]
return s.strip()
return s
def extractFirstBalancedJson(text: str) -> str:
"""Return the first balanced JSON object/array substring; otherwise return trimmed input."""
if not text:
return text
s = text.strip()
# Find first '{' or '['
brace = s.find('{')
bracket = s.find('[')
start = -1
if brace != -1 and (bracket == -1 or brace < bracket):
start = brace
elif bracket != -1:
start = bracket
if start == -1:
return s
# Scan for matching close using a simple stack
stack: List[str] = []
for i in range(start, len(s)):
ch = s[i]
if ch in '{[':
stack.append(ch)
elif ch in '}]':
if not stack:
continue
opener = stack.pop()
if (opener == '{' and ch != '}') or (opener == '[' and ch != ']'):
continue
if not stack:
return s[start:i+1].strip()
return s
def normalizeJsonText(text: str) -> str:
"""Light normalization: remove BOM, normalize smart quotes."""
if not text:
return text
s = text
# Remove UTF-8 BOM if present
if s.startswith('\ufeff'):
s = s.lstrip('\ufeff')
# Normalize smart quotes to straight quotes
s = s.replace('', '"').replace('', '"').replace('', "'").replace('', "'")
return s
def extractJsonString(text: str) -> str:
"""Strip code fences, normalize, then extract first balanced JSON substring."""
s = normalizeJsonText(text)
s = stripCodeFences(s)
s = extractFirstBalancedJson(s)
return s.strip()
def tryParseJson(text: Union[str, bytes]) -> Tuple[Optional[Union[Dict, List]], Optional[Exception], str]:
"""Extract and parse JSON; return (obj, error, cleaned_str)."""
if isinstance(text, bytes):
try:
text = text.decode('utf-8', errors='replace')
except Exception:
text = str(text)
cleaned = extractJsonString(text or "")
try:
return json.loads(cleaned), None, cleaned
except Exception as e:
return None, e, cleaned
def parseJsonOrRaise(text: Union[str, bytes]) -> Union[Dict, List]:
obj, err, cleaned = tryParseJson(text)
if err is not None:
logger.error(f"parse_json_or_raise failed: {err}. Cleaned preview: {cleaned[:200]}...")
raise err
return obj
def mergeRootLists(jsonParts: List[Union[str, Dict, List]]) -> Dict[str, Any]:
"""
Generic merger for root-level lists: take first dict as base; for each subsequent part:
- if value is list and same key exists as list, extend it
- if key absent, add it
- for non-list keys, keep the original (from the first part)
Sets continuation=None if present in base.
"""
base: Optional[Dict[str, Any]] = None
parsed: List[Dict[str, Any]] = []
for part in jsonParts:
if isinstance(part, (dict, list)):
obj = part
else:
obj, err, _ = tryParseJson(part)
if err is not None or not isinstance(obj, (dict, list)):
continue
if isinstance(obj, dict):
parsed.append(obj)
if not parsed:
return {}
base = dict(parsed[0])
for obj in parsed[1:]:
for k, v in obj.items():
if isinstance(v, list) and isinstance(base.get(k), list):
base[k].extend(v)
elif k not in base:
base[k] = v
if 'continuation' in base:
base['continuation'] = None
return base
def repairBrokenJson(text: str) -> Optional[Dict[str, Any]]:
"""
Attempt to repair broken JSON using multiple strategies.
Generic solution that works for any content type.
Returns the best repair attempt or None if all fail.
"""
if not text:
return None
# Strategy 1: Try to extract sections from the entire text first
# This handles cases where the JSON structure is broken but content is intact
extractedSections = _extractSectionsRegex(text)
if extractedSections:
logger.info(f"Extracted {len(extractedSections)} sections using regex")
return {
"metadata": {
"split_strategy": "single_document",
"source_documents": [],
"extraction_method": "ai_generation"
},
"documents": [{"sections": extractedSections}]
}
# Strategy 2: Progressive parsing - try to find longest valid prefix
bestResult = None
bestValidLength = 0
# Try different step sizes to find the best valid JSON
for stepSize in [100, 50, 10, 1]:
for i in range(len(text), 0, -stepSize):
testStr = text[:i]
closedStr = _closeJsonStructures(testStr)
obj, err, _ = tryParseJson(closedStr)
if err is None and isinstance(obj, dict):
bestResult = obj
bestValidLength = i
logger.debug(f"Progressive parsing success at length {i} (step: {stepSize})")
break
if bestResult:
break
if bestResult:
logger.info(f"Repaired JSON using progressive parsing (valid length: {bestValidLength})")
# Check if we have sections in the result
sections = extractSectionsFromDocument(bestResult)
if sections:
logger.info(f"Progressive parsing found {len(sections)} sections")
return bestResult
else:
# No sections found in progressive parsing, try to extract from broken part
logger.info("Progressive parsing found no sections, trying to extract from broken part")
extractedSections = _extractSectionsRegex(text[bestValidLength:])
if extractedSections:
logger.info(f"Extracted {len(extractedSections)} sections from broken part")
# Merge with the valid part
if "documents" not in bestResult:
bestResult["documents"] = []
if not bestResult["documents"]:
bestResult["documents"] = [{"sections": []}]
bestResult["documents"][0]["sections"].extend(extractedSections)
return bestResult
# Strategy 3: Structure closing - close incomplete structures
closedStr = _closeJsonStructures(text)
obj, err, _ = tryParseJson(closedStr)
if err is None and isinstance(obj, dict):
logger.info("Repaired JSON using structure closing")
return obj
logger.warning("All repair strategies failed")
return None
def _closeJsonStructures(text: str) -> str:
"""
Close incomplete JSON structures by adding missing closing brackets.
"""
if not text:
return text
# Count open/close brackets and braces
openBraces = text.count('{')
closeBraces = text.count('}')
openBrackets = text.count('[')
closeBrackets = text.count(']')
# Close incomplete structures
result = text
for _ in range(openBraces - closeBraces):
result += '}'
for _ in range(openBrackets - closeBrackets):
result += ']'
return result
def _extractSectionsRegex(text: str) -> List[Dict[str, Any]]:
"""
Extract sections from broken JSON using regex patterns.
Generic solution that works for any content type.
"""
import re
sections = []
# Pattern to find section objects
sectionPattern = r'"id"\s*:\s*"(section_\d+)"\s*,?\s*"content_type"\s*:\s*"(\w+)"\s*,?\s*"order"\s*:\s*(\d+)'
for match in re.finditer(sectionPattern, text, re.IGNORECASE):
sectionId = match.group(1)
contentType = match.group(2)
order = int(match.group(3))
# Try to extract elements array - look for the elements array after this section
elementsMatch = re.search(
r'"elements"\s*:\s*\[(.*?)\]',
text[match.end():match.end()+5000] # Look ahead for elements (large range)
)
elements = []
if elementsMatch:
try:
elementsStr = '[' + elementsMatch.group(1) + ']'
elements = json.loads(elementsStr)
except:
# If JSON parsing fails, try to extract individual items manually
elementsText = elementsMatch.group(1)
elements = _extractElementsFromText(elementsText, contentType)
sections.append({
"id": sectionId,
"content_type": contentType,
"elements": elements,
"order": order
})
# If no sections found with the main pattern, try to find any content patterns
if not sections:
sections = _extractGenericContent(text)
return sections
def _extractElementsFromText(elementsText: str, contentType: str) -> List[Dict[str, Any]]:
"""
Extract elements from text when JSON parsing fails.
Generic approach that works for any content type.
Handles incomplete strings and corrupted data.
Excludes the last incomplete item to prevent corrupted data.
"""
import re
elements = []
if contentType == "list":
# Look for {"text": "..."} patterns, including incomplete ones
text_items = re.findall(r'\{"text"\s*:\s*"([^"]*)"\}', elementsText)
# Also look for incomplete patterns like {"text": "36
incomplete_items = re.findall(r'\{"text"\s*:\s*"([^"]*?)(?:\n|$)', elementsText)
# Combine both complete and incomplete items
all_items = text_items + incomplete_items
# Remove duplicates and empty strings
unique_items = list(dict.fromkeys([item for item in all_items if item.strip()]))
# Remove the last item if it appears to be incomplete/corrupted
if unique_items:
unique_items = _removeLastIncompleteItem(unique_items, elementsText)
elements = [{"text": item} for item in unique_items]
elif contentType == "paragraph":
# Look for {"text": "..."} patterns, including incomplete ones
text_items = re.findall(r'\{"text"\s*:\s*"([^"]*)"\}', elementsText)
incomplete_items = re.findall(r'\{"text"\s*:\s*"([^"]*?)(?:\n|$)', elementsText)
all_items = text_items + incomplete_items
unique_items = list(dict.fromkeys([item for item in all_items if item.strip()]))
# Remove the last item if it appears to be incomplete/corrupted
if unique_items:
unique_items = _removeLastIncompleteItem(unique_items, elementsText)
elements = [{"text": item} for item in unique_items]
elif contentType == "heading":
# Look for {"level": X, "text": "..."} patterns, including incomplete ones
heading_items = re.findall(r'\{"level"\s*:\s*(\d+)\s*,\s*"text"\s*:\s*"([^"]*)"\}', elementsText)
incomplete_heading_items = re.findall(r'\{"level"\s*:\s*(\d+)\s*,\s*"text"\s*:\s*"([^"]*?)(?:\n|$)', elementsText)
all_items = heading_items + incomplete_heading_items
unique_items = list(dict.fromkeys([(int(level), text) for level, text in all_items if text.strip()]))
# Remove the last item if it appears to be incomplete/corrupted
if unique_items:
unique_items = _removeLastIncompleteItem(unique_items, elementsText)
elements = [{"level": level, "text": text} for level, text in unique_items]
elif contentType == "table":
# Look for table patterns
table_items = re.findall(r'\{"headers"\s*:\s*\[(.*?)\]\s*,\s*"rows"\s*:\s*\[(.*?)\]\s*,\s*"caption"\s*:\s*"([^"]*)"\}', elementsText)
for headers_str, rows_str, caption in table_items:
# Extract headers
headers = re.findall(r'"([^"]+)"', headers_str)
# Extract rows (simplified)
rows = []
row_matches = re.findall(r'\[(.*?)\]', rows_str)
for row_match in row_matches:
row_items = re.findall(r'"([^"]+)"', row_match)
rows.append(row_items)
elements.append({
"headers": headers,
"rows": rows,
"caption": caption
})
elif contentType == "code":
# Look for {"code": "...", "language": "..."} patterns, including incomplete ones
code_items = re.findall(r'\{"code"\s*:\s*"([^"]*)"\s*,\s*"language"\s*:\s*"([^"]*)"\}', elementsText)
incomplete_code_items = re.findall(r'\{"code"\s*:\s*"([^"]*?)(?:\n|$)', elementsText)
all_items = code_items + [(code, "unknown") for code in incomplete_code_items]
unique_items = list(dict.fromkeys([(code, lang) for code, lang in all_items if code.strip()]))
# Remove the last item if it appears to be incomplete/corrupted
if unique_items:
unique_items = _removeLastIncompleteItem(unique_items, elementsText)
elements = [{"code": code, "language": lang} for code, lang in unique_items]
else:
# Generic fallback - look for any text content, including incomplete
text_items = re.findall(r'"text"\s*:\s*"([^"]*)"', elementsText)
incomplete_text_items = re.findall(r'"text"\s*:\s*"([^"]*?)(?:\n|$)', elementsText)
all_items = text_items + incomplete_text_items
unique_items = list(dict.fromkeys([item for item in all_items if item.strip()]))
# Remove the last item if it appears to be incomplete/corrupted
if unique_items:
unique_items = _removeLastIncompleteItem(unique_items, elementsText)
elements = [{"text": item} for item in unique_items]
return elements
def _removeLastIncompleteItem(items: List[str], original_text: str) -> List[str]:
"""
Remove the last item if it appears to be incomplete/corrupted.
This prevents corrupted data from being included in the final result.
"""
import re
if not items:
return items
# Check if the original text ends with incomplete JSON patterns
# Look for patterns that suggest the last item was cut off
# Pattern 1: Text ends with incomplete string like {"text": "36
if re.search(r'\{"[^"]*"\s*:\s*"[^"]*$', original_text):
logger.debug("Detected incomplete string at end - removing last item")
return items[:-1]
# Pattern 2: Text ends with incomplete boolean like {"bool_flag": tr
if re.search(r'\{"[^"]*"\s*:\s*(true|false|tr|fa)$', original_text):
logger.debug("Detected incomplete boolean at end - removing last item")
return items[:-1]
# Pattern 3: Text ends with incomplete number like {"number": 123
if re.search(r'\{"[^"]*"\s*:\s*\d+$', original_text):
logger.debug("Detected incomplete number at end - removing last item")
return items[:-1]
# Pattern 4: Text ends with incomplete array like {"array": [1,2,3
if re.search(r'\{"[^"]*"\s*:\s*\[[^\]]*$', original_text):
logger.debug("Detected incomplete array at end - removing last item")
return items[:-1]
# Pattern 5: Text ends with incomplete object like {"obj": {"key": "val
if re.search(r'\{"[^"]*"\s*:\s*\{[^}]*$', original_text):
logger.debug("Detected incomplete object at end - removing last item")
return items[:-1]
# Pattern 6: Text ends with trailing comma (common sign of incomplete JSON)
if original_text.rstrip().endswith(','):
logger.debug("Detected trailing comma - removing last item")
return items[:-1]
# If no incomplete patterns detected, return all items
return items
def _extractGenericContent(text: str) -> List[Dict[str, Any]]:
"""
Extract generic content when no specific section patterns are found.
This handles cases where the JSON structure is completely broken.
Handles incomplete strings and corrupted data.
Excludes the last incomplete item to prevent corrupted data.
"""
import re
sections = []
# Look for any structured content patterns
# Pattern 1: Look for list items {"text": "..."}, including incomplete ones
list_items = re.findall(r'\{"text"\s*:\s*"([^"]*)"\}', text)
incomplete_list_items = re.findall(r'\{"text"\s*:\s*"([^"]*?)(?:\n|$)', text)
all_list_items = list_items + incomplete_list_items
unique_list_items = list(dict.fromkeys([item for item in all_list_items if item.strip()]))
# Remove the last item if it appears to be incomplete/corrupted
if unique_list_items:
unique_list_items = _removeLastIncompleteItem(unique_list_items, text)
if unique_list_items:
elements = [{"text": item} for item in unique_list_items]
sections.append({
"id": "section_1",
"content_type": "list",
"elements": elements,
"order": 1
})
# Pattern 2: Look for paragraph text {"text": "..."}, including incomplete ones
elif re.search(r'\{"text"\s*:\s*"[^"]*\}', text):
# Extract all text elements, including incomplete ones
text_items = re.findall(r'\{"text"\s*:\s*"([^"]*)"\}', text)
incomplete_text_items = re.findall(r'\{"text"\s*:\s*"([^"]*?)(?:\n|$)', text)
all_text_items = text_items + incomplete_text_items
unique_text_items = list(dict.fromkeys([item for item in all_text_items if item.strip()]))
# Remove the last item if it appears to be incomplete/corrupted
if unique_text_items:
unique_text_items = _removeLastIncompleteItem(unique_text_items, text)
if unique_text_items:
elements = [{"text": item} for item in unique_text_items]
sections.append({
"id": "section_1",
"content_type": "paragraph",
"elements": elements,
"order": 1
})
# Pattern 3: Look for any quoted strings that might be content, including incomplete ones
elif re.search(r'"([^"]{3,})"', text): # Strings longer than 3 chars (reduced threshold)
# Extract longer quoted strings, including incomplete ones
text_items = re.findall(r'"([^"]{3,})"', text)
incomplete_text_items = re.findall(r'"([^"]{3,}?)(?:\n|$)', text)
all_text_items = text_items + incomplete_text_items
# Filter out likely JSON keys
content_items = [item for item in all_text_items if not item.startswith(('section_', 'doc_', 'metadata', 'split_strategy', 'source_documents', 'extraction_method', 'id', 'content_type', 'elements', 'order', 'title', 'filename'))]
# Remove the last item if it appears to be incomplete/corrupted
if content_items:
content_items = _removeLastIncompleteItem(content_items, text)
if content_items:
elements = [{"text": item} for item in content_items[:10]] # Limit to first 10 items
sections.append({
"id": "section_1",
"content_type": "paragraph",
"elements": elements,
"order": 1
})
return sections
def extractSectionsFromDocument(documentData: Dict[str, Any]) -> List[Dict[str, Any]]:
"""
Extract all sections from document data structure.
Handles both flat and nested document structures.
"""
if not isinstance(documentData, dict):
return []
# Try to extract sections from documents array
if "documents" in documentData:
all_sections = []
for doc in documentData.get("documents", []):
if isinstance(doc, dict) and "sections" in doc:
sections = doc.get("sections", [])
if isinstance(sections, list):
all_sections.extend(sections)
return all_sections
# Try to extract sections directly from root
if "sections" in documentData:
sections = documentData.get("sections", [])
if isinstance(sections, list):
return sections
return []
def extractContentSample(section: Dict[str, Any]) -> str:
"""
Extract a sample of content from a section for continuation context.
Returns a string describing the last content for context.
"""
if not isinstance(section, dict):
return ""
content_type = section.get("content_type", "").lower()
elements = section.get("elements", [])
if not elements or not isinstance(elements, list):
return "Content exists"
# Get last elements for sampling
sample_elements = elements[-5:] if len(elements) > 5 else elements
if content_type == "list":
# Extract last few list items
items_text = []
for elem in sample_elements:
if isinstance(elem, dict) and "text" in elem:
items_text.append(elem.get("text", ""))
if items_text:
return f"Last {len(items_text)} items: {', '.join(items_text[:3])}"
elif content_type == "paragraph":
# Extract text and take last 150 chars
for elem in sample_elements:
if isinstance(elem, dict) and "text" in elem:
text = elem.get("text", "")
if len(text) > 150:
text = "..." + text[-150:]
return f"Last content: {text}"
elif content_type == "code":
# Extract last few lines
for elem in sample_elements:
if isinstance(elem, dict) and "code" in elem:
code = elem.get("code", "")
lines = code.split('\n')
if len(lines) > 5:
return f"Last lines ({len(lines)} total): {', '.join(lines[-3:])}"
return f"Code ({len(lines)} lines)"
elif content_type == "table":
# Extract last rows
for elem in sample_elements:
if isinstance(elem, dict) and "rows" in elem:
rows = elem.get("rows", [])
return f"Table with {len(rows)} rows"
return "Content exists"
def _buildDetailedContinuationInfo(section: Dict[str, Any], content_type: str) -> Dict[str, Any]:
"""
Build detailed continuation information for better AI guidance.
Completely generic - works for any content type (list, paragraph, code, table, etc.)
"""
elements = section.get("elements", [])
if not elements:
return {
"type": "continue_general",
"sample": extractContentSample(section),
"last_item": "",
"item_count": 0,
"guidance": "Continue generating content in the same format and style."
}
# Count elements regardless of type
element_count = len(elements)
# Extract sample for context - completely generic
sample = extractContentSample(section)
# Generic continuation guidance - applies to ANY content type
# Tell AI to generate ALL REMAINING content to complete the user request
return {
"type": "continue_general",
"sample": sample,
"last_item": "",
"item_count": element_count,
"guidance": "Generate ALL remaining content to complete the user's request. Continue from where you left off and finish everything that was requested."
}
def _extractLastItemsFromFragment(fragment: str, max_items: int = 10) -> str:
"""
Extract the last few items from a JSON fragment for continuation context.
Uses JSON structure (sections -> elements -> items) - fully generic.
Works with broken/incomplete JSON by trying to parse and extract sections.
"""
if not fragment:
return ""
# Strategy 1: Try to parse as JSON and extract from structure
try:
# Try to repair and parse the fragment
parsed = repairBrokenJson(fragment)
if parsed:
# Extract sections from parsed JSON using structure
sections = extractSectionsFromDocument(parsed)
if sections:
# Get the last section (likely where continuation should happen)
sorted_sections = sorted(sections, key=lambda s: s.get("order", 0))
last_section = sorted_sections[-1]
elements = last_section.get("elements", [])
if elements and isinstance(elements, list):
content_type = last_section.get("content_type", "").lower()
# For list content_type, extract from items array
if content_type == "list" and len(elements) > 0:
last_element = elements[-1]
if isinstance(last_element, dict):
# Check if it has an "items" array (list structure)
if "items" in last_element and isinstance(last_element["items"], list):
items_list = last_element["items"]
if items_list:
# Get last max_items from this items array
last_items = items_list[-max_items:] if len(items_list) > max_items else items_list
# Extract text from each item
texts = []
for item in last_items:
if isinstance(item, dict) and "text" in item:
texts.append(str(item["text"]))
if texts:
return ', '.join(texts)
# Or if elements themselves are items (alternative structure)
elif "text" in last_element:
# Get last max_items elements that have text
elements_with_text = [e for e in elements if isinstance(e, dict) and "text" in e]
if elements_with_text:
last_elements = elements_with_text[-max_items:] if len(elements_with_text) > max_items else elements_with_text
texts = [str(e.get("text", "")) for e in last_elements]
if texts:
return ', '.join(texts)
# For other content types, extract from elements
elif len(elements) > 0:
# Get last max_items elements that have text/code
valid_elements = [e for e in elements if isinstance(e, dict) and ("text" in e or "code" in e)]
if valid_elements:
last_elements = valid_elements[-max_items:] if len(valid_elements) > max_items else valid_elements
texts = []
for elem in last_elements:
if "text" in elem:
texts.append(str(elem["text"]))
elif "code" in elem:
# For code, show snippet
code = str(elem["code"])
texts.append(code[:50] + "..." if len(code) > 50 else code)
if texts:
return ', '.join(texts)
except Exception as e:
logger.debug(f"Could not extract items from fragment using JSON structure: {e}")
# Strategy 2: If parsing failed, try progressive parsing from the end
# Look for the last complete JSON structures near the end
try:
# Try parsing different lengths from the end
for length in [3000, 2000, 1000, 500]:
if len(fragment) > length:
end_portion = fragment[-length:]
closed = _closeJsonStructures(end_portion)
obj, err, _ = tryParseJson(closed)
if err is None and isinstance(obj, dict):
# Successfully parsed - extract sections
sections = extractSectionsFromDocument(obj)
if sections:
# Same extraction logic as above
sorted_sections = sorted(sections, key=lambda s: s.get("order", 0))
if sorted_sections:
last_section = sorted_sections[-1]
elements = last_section.get("elements", [])
if elements:
# Extract texts using same logic as Strategy 1
texts = []
for elem in elements[-max_items:]:
if isinstance(elem, dict):
if "items" in elem and isinstance(elem["items"], list):
# Get last item from items array
if elem["items"]:
last_item = elem["items"][-1]
if isinstance(last_item, dict) and "text" in last_item:
texts.append(str(last_item["text"]))
elif "text" in elem:
texts.append(str(elem["text"]))
if texts:
return ', '.join(texts[-max_items:])
except Exception as e:
logger.debug(f"Progressive parsing from end failed: {e}")
# Strategy 3: If all parsing fails, try simple extraction from raw fragment
# Look for last complete {"text": "..."} pattern near the end
try:
# Look at last 2000 chars for the pattern
end_portion = fragment[-2000:] if len(fragment) > 2000 else fragment
# Find all {"text": "value"} patterns
import re
# Pattern to match {"text": "..."} with escaped quotes
pattern = r'\{"text"\s*:\s*"([^"]+)"\}'
matches = re.findall(pattern, end_portion)
if matches:
# Get last max_items
last_matches = matches[-max_items:] if len(matches) > max_items else matches
return ', '.join(last_matches)
except Exception as e:
logger.debug(f"Simple pattern extraction failed: {e}")
# Strategy 4: If all fails, return empty (will use last_item_from_sections)
return ""
def buildContinuationContext(allSections: List[Dict[str, Any]], lastRawResponse: Optional[str] = None) -> Dict[str, Any]:
"""
Build context information from accumulated sections for continuation prompt.
Extracts last items and provides clear continuation point.
CRITICAL: Analyzes ALL accumulated sections (not just last response) to provide
accurate progress information to AI. This allows AI to understand completion status
without seeing the entire content (which would exceed token limits).
Args:
allSections: List of ALL sections accumulated across ALL iterations
lastRawResponse: Raw JSON response from last iteration (can be broken/incomplete)
Returns:
Dict with section_count, last_raw_json, last_items, continuation point, and
PROGRESS STATISTICS from all accumulated sections
"""
context = {
"section_count": len(allSections),
}
# CRITICAL: Analyze ALL accumulated sections to get accurate progress statistics
# This allows AI to understand completion status without seeing entire content
# GENERIC approach: Works for all task types (books, reports, code, lists, etc.)
totalRows = 0
totalItems = 0
totalCodeLines = 0
totalParagraphs = 0
totalHeadings = 0
totalContentSize = 0
contentTypes = set()
lastContentType = None
for section in allSections:
contentType = section.get("content_type", "")
contentTypes.add(contentType)
elements = section.get("elements", [])
# CRITICAL: Iterate through ALL elements, not just the last one
# This ensures we count all rows/items/lines from all elements in the section
if isinstance(elements, list):
# Multiple elements - iterate through all
for elem in elements:
if isinstance(elem, dict):
if contentType == "code_block":
code = elem.get("code", "")
if code:
lines = [l for l in code.split('\n') if l.strip()]
totalCodeLines += len(lines)
totalContentSize += len(code)
lastContentType = "code_block"
elif contentType == "table":
rows = elem.get("rows", [])
if isinstance(rows, list):
totalRows += len(rows) # Count ALL rows from ALL table elements
totalContentSize += len(str(rows))
lastContentType = "table"
elif contentType in ["bullet_list", "numbered_list"]:
items = elem.get("items", [])
if isinstance(items, list):
totalItems += len(items) # Count ALL items from ALL list elements
totalContentSize += len(str(items))
lastContentType = "list"
elif contentType == "heading":
text = elem.get("text", "")
if text:
totalHeadings += 1
totalContentSize += len(text)
lastContentType = "heading"
elif contentType == "paragraph":
text = elem.get("text", "")
if text:
totalParagraphs += 1
totalContentSize += len(text)
lastContentType = "paragraph"
elif isinstance(elements, dict):
# Single element as dict
elem = elements
if contentType == "code_block":
code = elem.get("code", "")
if code:
lines = [l for l in code.split('\n') if l.strip()]
totalCodeLines += len(lines)
totalContentSize += len(code)
lastContentType = "code_block"
elif contentType == "table":
rows = elem.get("rows", [])
if isinstance(rows, list):
totalRows += len(rows)
totalContentSize += len(str(rows))
lastContentType = "table"
elif contentType in ["bullet_list", "numbered_list"]:
items = elem.get("items", [])
if isinstance(items, list):
totalItems += len(items)
totalContentSize += len(str(items))
lastContentType = "list"
elif contentType == "heading":
text = elem.get("text", "")
if text:
totalHeadings += 1
totalContentSize += len(text)
lastContentType = "heading"
elif contentType == "paragraph":
text = elem.get("text", "")
if text:
totalParagraphs += 1
totalContentSize += len(text)
lastContentType = "paragraph"
# Store progress statistics (not full content - that would exceed token limits)
# These statistics help AI understand progress for ALL task types
context["progress_stats"] = {
"total_rows": totalRows,
"total_items": totalItems,
"total_code_lines": totalCodeLines,
"total_paragraphs": totalParagraphs,
"total_headings": totalHeadings,
"total_content_size": totalContentSize,
"section_count": len(allSections),
"content_type_count": len(contentTypes),
"content_types": list(contentTypes),
"last_content_type": lastContentType
}
# Extract last complete sub-item from allSections (already merged, contains all delivered data)
# Extract cut/incomplete sub-item from raw JSON (what was cut off)
last_complete_subobject = None
cut_subobject = None
content_type_for_items = None
total_items_count = 0
# STEP 1: Extract last complete sub-item from allSections (this is what was already delivered)
if allSections:
sorted_sections = sorted(allSections, key=lambda s: s.get("order", 0))
last_section = sorted_sections[-1]
content_type_for_items = last_section.get("content_type", "")
elements = last_section.get("elements", [])
if elements and isinstance(elements, list) and len(elements) > 0:
last_element = elements[-1]
if isinstance(last_element, dict):
# TABLE: Extract last complete row
if content_type_for_items == "table" and "rows" in last_element:
rows = last_element.get("rows", [])
if rows and isinstance(rows, list) and len(rows) > 0:
total_items_count = len(rows)
last_complete_subobject = rows[-1]
# LIST: Extract last complete item
elif content_type_for_items in ["bullet_list", "numbered_list"] and "items" in last_element:
items = last_element.get("items", [])
if items and isinstance(items, list) and len(items) > 0:
total_items_count = len(items)
last_complete_subobject = items[-1]
# CODE_BLOCK: Extract last complete line
elif content_type_for_items == "code_block" and "code" in last_element:
code = last_element.get("code", "")
if code:
lines = [l for l in code.split('\n') if l.strip()]
total_items_count = len(lines)
if lines:
last_complete_subobject = lines[-1]
# PARAGRAPH/HEADING: Extract last complete sentence
elif content_type_for_items in ["paragraph", "heading"] and "text" in last_element:
text = last_element.get("text", "")
if text:
import re
sentences = re.split(r'([.!?]+)', text)
complete_sentences = []
for i in range(0, len(sentences) - 1, 2):
if i + 1 < len(sentences):
complete_sentences.append(sentences[i] + sentences[i + 1])
total_items_count = len(complete_sentences)
if complete_sentences:
last_complete_subobject = complete_sentences[-1]
# STEP 2: Extract cut/incomplete sub-item from raw JSON (what was cut off)
if lastRawResponse:
raw_json = stripCodeFences(lastRawResponse.strip())
if raw_json and raw_json.strip() != "{}":
try:
import re
if content_type_for_items == "code_block":
# Find incomplete code line at the end
# Look for code string that doesn't end with closing quote
code_match = re.search(r'"code"\s*:\s*"([^"]*?)(?:"|$)', raw_json)
if code_match:
code_content = code_match.group(1)
try:
code_content = json.loads('"' + code_content + '"')
except:
pass
lines = code_content.split('\n')
if lines and not raw_json.rstrip().endswith('"'):
# Code string is incomplete - last line is cut
cut_subobject = lines[-1] if lines else None
elif content_type_for_items == "table":
# Find incomplete row at the end
row_pattern = r'\["([^"]*)"(?:,\s*"([^"]*)")*'
matches = list(re.finditer(row_pattern, raw_json))
if matches:
last_match = matches[-1]
end_pos = last_match.end()
if end_pos < len(raw_json):
remaining = raw_json[end_pos:end_pos+20].strip()
if not remaining.startswith(']'):
# Row is incomplete - extract values
cut_values = re.findall(r'"([^"]*)"', raw_json[last_match.start():last_match.end()])
if cut_values:
cut_subobject = cut_values
elif content_type_for_items in ["bullet_list", "numbered_list"]:
# Find incomplete item at the end
item_pattern = r'"([^"]*)"'
matches = list(re.finditer(item_pattern, raw_json))
if matches:
last_match = matches[-1]
end_pos = last_match.end()
if end_pos < len(raw_json):
remaining = raw_json[end_pos:end_pos+10].strip()
if remaining and remaining[0] not in [',', ']', '}', '"']:
cut_subobject = last_match.group(1)
except Exception as e:
logger.debug(f"Could not extract cut sub-object from raw JSON: {e}")
context["last_raw_json"] = raw_json
else:
context["last_raw_json"] = ""
else:
context["last_raw_json"] = ""
# Convert to JSON strings
if last_complete_subobject is not None:
try:
last_complete_subobject = json.dumps(last_complete_subobject)
except:
last_complete_subobject = str(last_complete_subobject)
if cut_subobject is not None:
try:
cut_subobject = json.dumps(cut_subobject)
except:
cut_subobject = str(cut_subobject)
context["last_item_object"] = last_complete_subobject if last_complete_subobject else ""
context["cut_item_object"] = cut_subobject if cut_subobject else None
context["content_type_for_items"] = content_type_for_items
context["total_items_count"] = total_items_count
return context
def parseJsonWithModel(jsonString: str, modelClass: Type[T]) -> T:
"""
Parse JSON string using Pydantic model with error handling.
Uses existing jsonUtils methods:
- extractJsonString() - Extracts JSON from text with code fences
- tryParseJson() - Safe parsing with error handling
- repairBrokenJson() - Repairs broken/incomplete JSON
Args:
jsonString: JSON string to parse (may contain code fences, extra text, etc.)
modelClass: Pydantic model class to parse into
Returns:
Parsed Pydantic model instance
Raises:
ValueError: If JSON cannot be parsed or validated
"""
if not jsonString:
raise ValueError(f"Cannot parse empty JSON string for {modelClass.__name__}")
# Step 1: Extract JSON string (handles code fences, extra text)
extractedJson = extractJsonString(jsonString)
if not extractedJson or extractedJson.strip() == "":
raise ValueError(f"No JSON found in string for {modelClass.__name__}")
# Step 2: Try to parse as JSON
parsedJson, error, cleaned = tryParseJson(extractedJson)
if error is None and parsedJson is not None:
# Successfully parsed - try to create model
try:
if isinstance(parsedJson, dict):
return modelClass(**parsedJson)
elif isinstance(parsedJson, list):
# If model expects a list, try to parse first item
if parsedJson:
return modelClass(**parsedJson[0])
else:
raise ValueError(f"Empty list cannot be parsed as {modelClass.__name__}")
else:
raise ValueError(f"Parsed JSON is not a dict or list: {type(parsedJson)}")
except ValidationError as e:
logger.error(f"Validation error parsing {modelClass.__name__}: {e}")
raise ValueError(f"Invalid data for {modelClass.__name__}: {e}")
except Exception as e:
logger.error(f"Error creating {modelClass.__name__} instance: {e}")
raise ValueError(f"Failed to create {modelClass.__name__} instance: {e}")
# Step 3: Try to repair broken JSON
logger.warning(f"Initial JSON parsing failed, attempting repair for {modelClass.__name__}")
repairedJson = repairBrokenJson(extractedJson)
if repairedJson:
# Try parsing repaired JSON
parsedRepaired, errorRepaired, _ = tryParseJson(json.dumps(repairedJson))
if errorRepaired is None and parsedRepaired is not None:
try:
if isinstance(parsedRepaired, dict):
return modelClass(**parsedRepaired)
elif isinstance(parsedRepaired, list) and parsedRepaired:
return modelClass(**parsedRepaired[0])
except ValidationError as e:
logger.error(f"Validation error parsing repaired {modelClass.__name__}: {e}")
raise ValueError(f"Invalid repaired data for {modelClass.__name__}: {e}")
except Exception as e:
logger.error(f"Error creating {modelClass.__name__} from repaired JSON: {e}")
# Step 4: All parsing failed
logger.error(f"Failed to parse JSON for {modelClass.__name__}. Cleaned JSON preview: {cleaned[:200]}...")
raise ValueError(f"Failed to parse or validate JSON for {modelClass.__name__}. JSON may be malformed or incomplete.")