888 lines
38 KiB
Python
888 lines
38 KiB
Python
import json
|
||
import logging
|
||
from typing import Any, Dict, List, Optional, Tuple, Union
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
def stripCodeFences(text: str) -> str:
|
||
"""Remove ```json / ``` fences and surrounding whitespace if present."""
|
||
if not text:
|
||
return text
|
||
s = text.strip()
|
||
if s.startswith("```") and s.endswith("```"):
|
||
# Remove first/last triple backticks
|
||
# Commonly starts with ```json\n
|
||
# Strip opening backticks
|
||
i = 3
|
||
# Skip optional language tag like 'json'
|
||
while i < len(s) and s[i] != '\n':
|
||
i += 1
|
||
if i < len(s) and s[i] == '\n':
|
||
s = s[i+1:]
|
||
# Strip trailing ```
|
||
if s.endswith("```"):
|
||
s = s[:-3]
|
||
return s.strip()
|
||
return s
|
||
|
||
|
||
def extractFirstBalancedJson(text: str) -> str:
|
||
"""Return the first balanced JSON object/array substring; otherwise return trimmed input."""
|
||
if not text:
|
||
return text
|
||
s = text.strip()
|
||
# Find first '{' or '['
|
||
brace = s.find('{')
|
||
bracket = s.find('[')
|
||
start = -1
|
||
if brace != -1 and (bracket == -1 or brace < bracket):
|
||
start = brace
|
||
elif bracket != -1:
|
||
start = bracket
|
||
if start == -1:
|
||
return s
|
||
# Scan for matching close using a simple stack
|
||
stack: List[str] = []
|
||
for i in range(start, len(s)):
|
||
ch = s[i]
|
||
if ch in '{[':
|
||
stack.append(ch)
|
||
elif ch in '}]':
|
||
if not stack:
|
||
continue
|
||
opener = stack.pop()
|
||
if (opener == '{' and ch != '}') or (opener == '[' and ch != ']'):
|
||
continue
|
||
if not stack:
|
||
return s[start:i+1].strip()
|
||
return s
|
||
|
||
|
||
def normalizeJsonText(text: str) -> str:
|
||
"""Light normalization: remove BOM, normalize smart quotes."""
|
||
if not text:
|
||
return text
|
||
s = text
|
||
# Remove UTF-8 BOM if present
|
||
if s.startswith('\ufeff'):
|
||
s = s.lstrip('\ufeff')
|
||
# Normalize smart quotes to straight quotes
|
||
s = s.replace('“', '"').replace('”', '"').replace('’', "'").replace('‘', "'")
|
||
return s
|
||
|
||
|
||
def extractJsonString(text: str) -> str:
|
||
"""Strip code fences, normalize, then extract first balanced JSON substring."""
|
||
s = normalizeJsonText(text)
|
||
s = stripCodeFences(s)
|
||
s = extractFirstBalancedJson(s)
|
||
return s.strip()
|
||
|
||
|
||
def tryParseJson(text: Union[str, bytes]) -> Tuple[Optional[Union[Dict, List]], Optional[Exception], str]:
|
||
"""Extract and parse JSON; return (obj, error, cleaned_str)."""
|
||
if isinstance(text, bytes):
|
||
try:
|
||
text = text.decode('utf-8', errors='replace')
|
||
except Exception:
|
||
text = str(text)
|
||
cleaned = extractJsonString(text or "")
|
||
try:
|
||
return json.loads(cleaned), None, cleaned
|
||
except Exception as e:
|
||
return None, e, cleaned
|
||
|
||
|
||
def parseJsonOrRaise(text: Union[str, bytes]) -> Union[Dict, List]:
|
||
obj, err, cleaned = tryParseJson(text)
|
||
if err is not None:
|
||
logger.error(f"parse_json_or_raise failed: {err}. Cleaned preview: {cleaned[:200]}...")
|
||
raise err
|
||
return obj
|
||
|
||
|
||
def mergeRootLists(jsonParts: List[Union[str, Dict, List]]) -> Dict[str, Any]:
|
||
"""
|
||
Generic merger for root-level lists: take first dict as base; for each subsequent part:
|
||
- if value is list and same key exists as list, extend it
|
||
- if key absent, add it
|
||
- for non-list keys, keep the original (from the first part)
|
||
Sets continuation=None if present in base.
|
||
"""
|
||
base: Optional[Dict[str, Any]] = None
|
||
parsed: List[Dict[str, Any]] = []
|
||
for part in jsonParts:
|
||
if isinstance(part, (dict, list)):
|
||
obj = part
|
||
else:
|
||
obj, err, _ = tryParseJson(part)
|
||
if err is not None or not isinstance(obj, (dict, list)):
|
||
continue
|
||
if isinstance(obj, dict):
|
||
parsed.append(obj)
|
||
if not parsed:
|
||
return {}
|
||
base = dict(parsed[0])
|
||
for obj in parsed[1:]:
|
||
for k, v in obj.items():
|
||
if isinstance(v, list) and isinstance(base.get(k), list):
|
||
base[k].extend(v)
|
||
elif k not in base:
|
||
base[k] = v
|
||
if 'continuation' in base:
|
||
base['continuation'] = None
|
||
return base
|
||
|
||
|
||
def repairBrokenJson(text: str) -> Optional[Dict[str, Any]]:
|
||
"""
|
||
Attempt to repair broken JSON using multiple strategies.
|
||
Generic solution that works for any content type.
|
||
Returns the best repair attempt or None if all fail.
|
||
"""
|
||
if not text:
|
||
return None
|
||
|
||
# Strategy 1: Try to extract sections from the entire text first
|
||
# This handles cases where the JSON structure is broken but content is intact
|
||
extractedSections = _extractSectionsRegex(text)
|
||
if extractedSections:
|
||
logger.info(f"Extracted {len(extractedSections)} sections using regex")
|
||
return {
|
||
"metadata": {
|
||
"split_strategy": "single_document",
|
||
"source_documents": [],
|
||
"extraction_method": "ai_generation"
|
||
},
|
||
"documents": [{"sections": extractedSections}]
|
||
}
|
||
|
||
# Strategy 2: Progressive parsing - try to find longest valid prefix
|
||
bestResult = None
|
||
bestValidLength = 0
|
||
|
||
# Try different step sizes to find the best valid JSON
|
||
for stepSize in [100, 50, 10, 1]:
|
||
for i in range(len(text), 0, -stepSize):
|
||
testStr = text[:i]
|
||
closedStr = _closeJsonStructures(testStr)
|
||
obj, err, _ = tryParseJson(closedStr)
|
||
if err is None and isinstance(obj, dict):
|
||
bestResult = obj
|
||
bestValidLength = i
|
||
logger.debug(f"Progressive parsing success at length {i} (step: {stepSize})")
|
||
break
|
||
if bestResult:
|
||
break
|
||
|
||
if bestResult:
|
||
logger.info(f"Repaired JSON using progressive parsing (valid length: {bestValidLength})")
|
||
|
||
# Check if we have sections in the result
|
||
sections = extractSectionsFromDocument(bestResult)
|
||
if sections:
|
||
logger.info(f"Progressive parsing found {len(sections)} sections")
|
||
return bestResult
|
||
else:
|
||
# No sections found in progressive parsing, try to extract from broken part
|
||
logger.info("Progressive parsing found no sections, trying to extract from broken part")
|
||
extractedSections = _extractSectionsRegex(text[bestValidLength:])
|
||
if extractedSections:
|
||
logger.info(f"Extracted {len(extractedSections)} sections from broken part")
|
||
# Merge with the valid part
|
||
if "documents" not in bestResult:
|
||
bestResult["documents"] = []
|
||
if not bestResult["documents"]:
|
||
bestResult["documents"] = [{"sections": []}]
|
||
bestResult["documents"][0]["sections"].extend(extractedSections)
|
||
return bestResult
|
||
|
||
# Strategy 3: Structure closing - close incomplete structures
|
||
closedStr = _closeJsonStructures(text)
|
||
obj, err, _ = tryParseJson(closedStr)
|
||
if err is None and isinstance(obj, dict):
|
||
logger.info("Repaired JSON using structure closing")
|
||
return obj
|
||
|
||
logger.warning("All repair strategies failed")
|
||
return None
|
||
|
||
|
||
def _closeJsonStructures(text: str) -> str:
|
||
"""
|
||
Close incomplete JSON structures by adding missing closing brackets.
|
||
"""
|
||
if not text:
|
||
return text
|
||
|
||
# Count open/close brackets and braces
|
||
openBraces = text.count('{')
|
||
closeBraces = text.count('}')
|
||
openBrackets = text.count('[')
|
||
closeBrackets = text.count(']')
|
||
|
||
# Close incomplete structures
|
||
result = text
|
||
for _ in range(openBraces - closeBraces):
|
||
result += '}'
|
||
for _ in range(openBrackets - closeBrackets):
|
||
result += ']'
|
||
|
||
return result
|
||
|
||
|
||
def _extractSectionsRegex(text: str) -> List[Dict[str, Any]]:
|
||
"""
|
||
Extract sections from broken JSON using regex patterns.
|
||
Generic solution that works for any content type.
|
||
"""
|
||
import re
|
||
|
||
sections = []
|
||
|
||
# Pattern to find section objects
|
||
sectionPattern = r'"id"\s*:\s*"(section_\d+)"\s*,?\s*"content_type"\s*:\s*"(\w+)"\s*,?\s*"order"\s*:\s*(\d+)'
|
||
|
||
for match in re.finditer(sectionPattern, text, re.IGNORECASE):
|
||
sectionId = match.group(1)
|
||
contentType = match.group(2)
|
||
order = int(match.group(3))
|
||
|
||
# Try to extract elements array - look for the elements array after this section
|
||
elementsMatch = re.search(
|
||
r'"elements"\s*:\s*\[(.*?)\]',
|
||
text[match.end():match.end()+5000] # Look ahead for elements (large range)
|
||
)
|
||
|
||
elements = []
|
||
if elementsMatch:
|
||
try:
|
||
elementsStr = '[' + elementsMatch.group(1) + ']'
|
||
elements = json.loads(elementsStr)
|
||
except:
|
||
# If JSON parsing fails, try to extract individual items manually
|
||
elementsText = elementsMatch.group(1)
|
||
elements = _extractElementsFromText(elementsText, contentType)
|
||
|
||
sections.append({
|
||
"id": sectionId,
|
||
"content_type": contentType,
|
||
"elements": elements,
|
||
"order": order
|
||
})
|
||
|
||
# If no sections found with the main pattern, try to find any content patterns
|
||
if not sections:
|
||
sections = _extractGenericContent(text)
|
||
|
||
return sections
|
||
|
||
|
||
def _extractElementsFromText(elementsText: str, contentType: str) -> List[Dict[str, Any]]:
|
||
"""
|
||
Extract elements from text when JSON parsing fails.
|
||
Generic approach that works for any content type.
|
||
Handles incomplete strings and corrupted data.
|
||
Excludes the last incomplete item to prevent corrupted data.
|
||
"""
|
||
import re
|
||
|
||
elements = []
|
||
|
||
if contentType == "list":
|
||
# Look for {"text": "..."} patterns, including incomplete ones
|
||
text_items = re.findall(r'\{"text"\s*:\s*"([^"]*)"\}', elementsText)
|
||
# Also look for incomplete patterns like {"text": "36
|
||
incomplete_items = re.findall(r'\{"text"\s*:\s*"([^"]*?)(?:\n|$)', elementsText)
|
||
|
||
# Combine both complete and incomplete items
|
||
all_items = text_items + incomplete_items
|
||
# Remove duplicates and empty strings
|
||
unique_items = list(dict.fromkeys([item for item in all_items if item.strip()]))
|
||
|
||
# Remove the last item if it appears to be incomplete/corrupted
|
||
if unique_items:
|
||
unique_items = _removeLastIncompleteItem(unique_items, elementsText)
|
||
|
||
elements = [{"text": item} for item in unique_items]
|
||
|
||
elif contentType == "paragraph":
|
||
# Look for {"text": "..."} patterns, including incomplete ones
|
||
text_items = re.findall(r'\{"text"\s*:\s*"([^"]*)"\}', elementsText)
|
||
incomplete_items = re.findall(r'\{"text"\s*:\s*"([^"]*?)(?:\n|$)', elementsText)
|
||
|
||
all_items = text_items + incomplete_items
|
||
unique_items = list(dict.fromkeys([item for item in all_items if item.strip()]))
|
||
|
||
# Remove the last item if it appears to be incomplete/corrupted
|
||
if unique_items:
|
||
unique_items = _removeLastIncompleteItem(unique_items, elementsText)
|
||
|
||
elements = [{"text": item} for item in unique_items]
|
||
|
||
elif contentType == "heading":
|
||
# Look for {"level": X, "text": "..."} patterns, including incomplete ones
|
||
heading_items = re.findall(r'\{"level"\s*:\s*(\d+)\s*,\s*"text"\s*:\s*"([^"]*)"\}', elementsText)
|
||
incomplete_heading_items = re.findall(r'\{"level"\s*:\s*(\d+)\s*,\s*"text"\s*:\s*"([^"]*?)(?:\n|$)', elementsText)
|
||
|
||
all_items = heading_items + incomplete_heading_items
|
||
unique_items = list(dict.fromkeys([(int(level), text) for level, text in all_items if text.strip()]))
|
||
|
||
# Remove the last item if it appears to be incomplete/corrupted
|
||
if unique_items:
|
||
unique_items = _removeLastIncompleteItem(unique_items, elementsText)
|
||
|
||
elements = [{"level": level, "text": text} for level, text in unique_items]
|
||
|
||
elif contentType == "table":
|
||
# Look for table patterns
|
||
table_items = re.findall(r'\{"headers"\s*:\s*\[(.*?)\]\s*,\s*"rows"\s*:\s*\[(.*?)\]\s*,\s*"caption"\s*:\s*"([^"]*)"\}', elementsText)
|
||
for headers_str, rows_str, caption in table_items:
|
||
# Extract headers
|
||
headers = re.findall(r'"([^"]+)"', headers_str)
|
||
# Extract rows (simplified)
|
||
rows = []
|
||
row_matches = re.findall(r'\[(.*?)\]', rows_str)
|
||
for row_match in row_matches:
|
||
row_items = re.findall(r'"([^"]+)"', row_match)
|
||
rows.append(row_items)
|
||
|
||
elements.append({
|
||
"headers": headers,
|
||
"rows": rows,
|
||
"caption": caption
|
||
})
|
||
|
||
elif contentType == "code":
|
||
# Look for {"code": "...", "language": "..."} patterns, including incomplete ones
|
||
code_items = re.findall(r'\{"code"\s*:\s*"([^"]*)"\s*,\s*"language"\s*:\s*"([^"]*)"\}', elementsText)
|
||
incomplete_code_items = re.findall(r'\{"code"\s*:\s*"([^"]*?)(?:\n|$)', elementsText)
|
||
|
||
all_items = code_items + [(code, "unknown") for code in incomplete_code_items]
|
||
unique_items = list(dict.fromkeys([(code, lang) for code, lang in all_items if code.strip()]))
|
||
|
||
# Remove the last item if it appears to be incomplete/corrupted
|
||
if unique_items:
|
||
unique_items = _removeLastIncompleteItem(unique_items, elementsText)
|
||
|
||
elements = [{"code": code, "language": lang} for code, lang in unique_items]
|
||
|
||
else:
|
||
# Generic fallback - look for any text content, including incomplete
|
||
text_items = re.findall(r'"text"\s*:\s*"([^"]*)"', elementsText)
|
||
incomplete_text_items = re.findall(r'"text"\s*:\s*"([^"]*?)(?:\n|$)', elementsText)
|
||
|
||
all_items = text_items + incomplete_text_items
|
||
unique_items = list(dict.fromkeys([item for item in all_items if item.strip()]))
|
||
|
||
# Remove the last item if it appears to be incomplete/corrupted
|
||
if unique_items:
|
||
unique_items = _removeLastIncompleteItem(unique_items, elementsText)
|
||
|
||
elements = [{"text": item} for item in unique_items]
|
||
|
||
return elements
|
||
|
||
|
||
def _removeLastIncompleteItem(items: List[str], original_text: str) -> List[str]:
|
||
"""
|
||
Remove the last item if it appears to be incomplete/corrupted.
|
||
This prevents corrupted data from being included in the final result.
|
||
"""
|
||
import re
|
||
|
||
if not items:
|
||
return items
|
||
|
||
# Check if the original text ends with incomplete JSON patterns
|
||
# Look for patterns that suggest the last item was cut off
|
||
|
||
# Pattern 1: Text ends with incomplete string like {"text": "36
|
||
if re.search(r'\{"[^"]*"\s*:\s*"[^"]*$', original_text):
|
||
logger.debug("Detected incomplete string at end - removing last item")
|
||
return items[:-1]
|
||
|
||
# Pattern 2: Text ends with incomplete boolean like {"bool_flag": tr
|
||
if re.search(r'\{"[^"]*"\s*:\s*(true|false|tr|fa)$', original_text):
|
||
logger.debug("Detected incomplete boolean at end - removing last item")
|
||
return items[:-1]
|
||
|
||
# Pattern 3: Text ends with incomplete number like {"number": 123
|
||
if re.search(r'\{"[^"]*"\s*:\s*\d+$', original_text):
|
||
logger.debug("Detected incomplete number at end - removing last item")
|
||
return items[:-1]
|
||
|
||
# Pattern 4: Text ends with incomplete array like {"array": [1,2,3
|
||
if re.search(r'\{"[^"]*"\s*:\s*\[[^\]]*$', original_text):
|
||
logger.debug("Detected incomplete array at end - removing last item")
|
||
return items[:-1]
|
||
|
||
# Pattern 5: Text ends with incomplete object like {"obj": {"key": "val
|
||
if re.search(r'\{"[^"]*"\s*:\s*\{[^}]*$', original_text):
|
||
logger.debug("Detected incomplete object at end - removing last item")
|
||
return items[:-1]
|
||
|
||
# Pattern 6: Text ends with trailing comma (common sign of incomplete JSON)
|
||
if original_text.rstrip().endswith(','):
|
||
logger.debug("Detected trailing comma - removing last item")
|
||
return items[:-1]
|
||
|
||
# If no incomplete patterns detected, return all items
|
||
return items
|
||
|
||
|
||
def _extractGenericContent(text: str) -> List[Dict[str, Any]]:
|
||
"""
|
||
Extract generic content when no specific section patterns are found.
|
||
This handles cases where the JSON structure is completely broken.
|
||
Handles incomplete strings and corrupted data.
|
||
Excludes the last incomplete item to prevent corrupted data.
|
||
"""
|
||
import re
|
||
|
||
sections = []
|
||
|
||
# Look for any structured content patterns
|
||
# Pattern 1: Look for list items {"text": "..."}, including incomplete ones
|
||
list_items = re.findall(r'\{"text"\s*:\s*"([^"]*)"\}', text)
|
||
incomplete_list_items = re.findall(r'\{"text"\s*:\s*"([^"]*?)(?:\n|$)', text)
|
||
|
||
all_list_items = list_items + incomplete_list_items
|
||
unique_list_items = list(dict.fromkeys([item for item in all_list_items if item.strip()]))
|
||
|
||
# Remove the last item if it appears to be incomplete/corrupted
|
||
if unique_list_items:
|
||
unique_list_items = _removeLastIncompleteItem(unique_list_items, text)
|
||
|
||
if unique_list_items:
|
||
elements = [{"text": item} for item in unique_list_items]
|
||
sections.append({
|
||
"id": "section_1",
|
||
"content_type": "list",
|
||
"elements": elements,
|
||
"order": 1
|
||
})
|
||
|
||
# Pattern 2: Look for paragraph text {"text": "..."}, including incomplete ones
|
||
elif re.search(r'\{"text"\s*:\s*"[^"]*\}', text):
|
||
# Extract all text elements, including incomplete ones
|
||
text_items = re.findall(r'\{"text"\s*:\s*"([^"]*)"\}', text)
|
||
incomplete_text_items = re.findall(r'\{"text"\s*:\s*"([^"]*?)(?:\n|$)', text)
|
||
|
||
all_text_items = text_items + incomplete_text_items
|
||
unique_text_items = list(dict.fromkeys([item for item in all_text_items if item.strip()]))
|
||
|
||
# Remove the last item if it appears to be incomplete/corrupted
|
||
if unique_text_items:
|
||
unique_text_items = _removeLastIncompleteItem(unique_text_items, text)
|
||
|
||
if unique_text_items:
|
||
elements = [{"text": item} for item in unique_text_items]
|
||
sections.append({
|
||
"id": "section_1",
|
||
"content_type": "paragraph",
|
||
"elements": elements,
|
||
"order": 1
|
||
})
|
||
|
||
# Pattern 3: Look for any quoted strings that might be content, including incomplete ones
|
||
elif re.search(r'"([^"]{3,})"', text): # Strings longer than 3 chars (reduced threshold)
|
||
# Extract longer quoted strings, including incomplete ones
|
||
text_items = re.findall(r'"([^"]{3,})"', text)
|
||
incomplete_text_items = re.findall(r'"([^"]{3,}?)(?:\n|$)', text)
|
||
|
||
all_text_items = text_items + incomplete_text_items
|
||
# Filter out likely JSON keys
|
||
content_items = [item for item in all_text_items if not item.startswith(('section_', 'doc_', 'metadata', 'split_strategy', 'source_documents', 'extraction_method', 'id', 'content_type', 'elements', 'order', 'title', 'filename'))]
|
||
|
||
# Remove the last item if it appears to be incomplete/corrupted
|
||
if content_items:
|
||
content_items = _removeLastIncompleteItem(content_items, text)
|
||
|
||
if content_items:
|
||
elements = [{"text": item} for item in content_items[:10]] # Limit to first 10 items
|
||
sections.append({
|
||
"id": "section_1",
|
||
"content_type": "paragraph",
|
||
"elements": elements,
|
||
"order": 1
|
||
})
|
||
|
||
return sections
|
||
|
||
|
||
def extractSectionsFromDocument(documentData: Dict[str, Any]) -> List[Dict[str, Any]]:
|
||
"""
|
||
Extract all sections from document data structure.
|
||
Handles both flat and nested document structures.
|
||
"""
|
||
if not isinstance(documentData, dict):
|
||
return []
|
||
|
||
# Try to extract sections from documents array
|
||
if "documents" in documentData:
|
||
all_sections = []
|
||
for doc in documentData.get("documents", []):
|
||
if isinstance(doc, dict) and "sections" in doc:
|
||
sections = doc.get("sections", [])
|
||
if isinstance(sections, list):
|
||
all_sections.extend(sections)
|
||
return all_sections
|
||
|
||
# Try to extract sections directly from root
|
||
if "sections" in documentData:
|
||
sections = documentData.get("sections", [])
|
||
if isinstance(sections, list):
|
||
return sections
|
||
|
||
return []
|
||
|
||
|
||
def extractContentSample(section: Dict[str, Any]) -> str:
|
||
"""
|
||
Extract a sample of content from a section for continuation context.
|
||
Returns a string describing the last content for context.
|
||
"""
|
||
if not isinstance(section, dict):
|
||
return ""
|
||
|
||
content_type = section.get("content_type", "").lower()
|
||
elements = section.get("elements", [])
|
||
|
||
if not elements or not isinstance(elements, list):
|
||
return "Content exists"
|
||
|
||
# Get last elements for sampling
|
||
sample_elements = elements[-5:] if len(elements) > 5 else elements
|
||
|
||
if content_type == "list":
|
||
# Extract last few list items
|
||
items_text = []
|
||
for elem in sample_elements:
|
||
if isinstance(elem, dict) and "text" in elem:
|
||
items_text.append(elem.get("text", ""))
|
||
if items_text:
|
||
return f"Last {len(items_text)} items: {', '.join(items_text[:3])}"
|
||
|
||
elif content_type == "paragraph":
|
||
# Extract text and take last 150 chars
|
||
for elem in sample_elements:
|
||
if isinstance(elem, dict) and "text" in elem:
|
||
text = elem.get("text", "")
|
||
if len(text) > 150:
|
||
text = "..." + text[-150:]
|
||
return f"Last content: {text}"
|
||
|
||
elif content_type == "code":
|
||
# Extract last few lines
|
||
for elem in sample_elements:
|
||
if isinstance(elem, dict) and "code" in elem:
|
||
code = elem.get("code", "")
|
||
lines = code.split('\n')
|
||
if len(lines) > 5:
|
||
return f"Last lines ({len(lines)} total): {', '.join(lines[-3:])}"
|
||
return f"Code ({len(lines)} lines)"
|
||
|
||
elif content_type == "table":
|
||
# Extract last rows
|
||
for elem in sample_elements:
|
||
if isinstance(elem, dict) and "rows" in elem:
|
||
rows = elem.get("rows", [])
|
||
return f"Table with {len(rows)} rows"
|
||
|
||
return "Content exists"
|
||
|
||
|
||
def _buildDetailedContinuationInfo(section: Dict[str, Any], content_type: str) -> Dict[str, Any]:
|
||
"""
|
||
Build detailed continuation information for better AI guidance.
|
||
Completely generic - works for any content type (list, paragraph, code, table, etc.)
|
||
"""
|
||
elements = section.get("elements", [])
|
||
|
||
if not elements:
|
||
return {
|
||
"type": "continue_general",
|
||
"sample": extractContentSample(section),
|
||
"last_item": "",
|
||
"item_count": 0,
|
||
"guidance": "Continue generating content in the same format and style."
|
||
}
|
||
|
||
# Count elements regardless of type
|
||
element_count = len(elements)
|
||
|
||
# Extract sample for context - completely generic
|
||
sample = extractContentSample(section)
|
||
|
||
# Generic continuation guidance - applies to ANY content type
|
||
# Tell AI to generate ALL REMAINING content to complete the user request
|
||
return {
|
||
"type": "continue_general",
|
||
"sample": sample,
|
||
"last_item": "",
|
||
"item_count": element_count,
|
||
"guidance": "Generate ALL remaining content to complete the user's request. Continue from where you left off and finish everything that was requested."
|
||
}
|
||
|
||
|
||
def _extractLastItemsFromFragment(fragment: str, max_items: int = 10) -> str:
|
||
"""
|
||
Extract the last few items from a JSON fragment for continuation context.
|
||
Uses JSON structure (sections -> elements -> items) - fully generic.
|
||
Works with broken/incomplete JSON by trying to parse and extract sections.
|
||
"""
|
||
if not fragment:
|
||
return ""
|
||
|
||
# Strategy 1: Try to parse as JSON and extract from structure
|
||
try:
|
||
# Try to repair and parse the fragment
|
||
parsed = repairBrokenJson(fragment)
|
||
if parsed:
|
||
# Extract sections from parsed JSON using structure
|
||
sections = extractSectionsFromDocument(parsed)
|
||
if sections:
|
||
# Get the last section (likely where continuation should happen)
|
||
sorted_sections = sorted(sections, key=lambda s: s.get("order", 0))
|
||
last_section = sorted_sections[-1]
|
||
elements = last_section.get("elements", [])
|
||
|
||
if elements and isinstance(elements, list):
|
||
content_type = last_section.get("content_type", "").lower()
|
||
|
||
# For list content_type, extract from items array
|
||
if content_type == "list" and len(elements) > 0:
|
||
last_element = elements[-1]
|
||
if isinstance(last_element, dict):
|
||
# Check if it has an "items" array (list structure)
|
||
if "items" in last_element and isinstance(last_element["items"], list):
|
||
items_list = last_element["items"]
|
||
if items_list:
|
||
# Get last max_items from this items array
|
||
last_items = items_list[-max_items:] if len(items_list) > max_items else items_list
|
||
# Extract text from each item
|
||
texts = []
|
||
for item in last_items:
|
||
if isinstance(item, dict) and "text" in item:
|
||
texts.append(str(item["text"]))
|
||
if texts:
|
||
return ', '.join(texts)
|
||
|
||
# Or if elements themselves are items (alternative structure)
|
||
elif "text" in last_element:
|
||
# Get last max_items elements that have text
|
||
elements_with_text = [e for e in elements if isinstance(e, dict) and "text" in e]
|
||
if elements_with_text:
|
||
last_elements = elements_with_text[-max_items:] if len(elements_with_text) > max_items else elements_with_text
|
||
texts = [str(e.get("text", "")) for e in last_elements]
|
||
if texts:
|
||
return ', '.join(texts)
|
||
|
||
# For other content types, extract from elements
|
||
elif len(elements) > 0:
|
||
# Get last max_items elements that have text/code
|
||
valid_elements = [e for e in elements if isinstance(e, dict) and ("text" in e or "code" in e)]
|
||
if valid_elements:
|
||
last_elements = valid_elements[-max_items:] if len(valid_elements) > max_items else valid_elements
|
||
texts = []
|
||
for elem in last_elements:
|
||
if "text" in elem:
|
||
texts.append(str(elem["text"]))
|
||
elif "code" in elem:
|
||
# For code, show snippet
|
||
code = str(elem["code"])
|
||
texts.append(code[:50] + "..." if len(code) > 50 else code)
|
||
if texts:
|
||
return ', '.join(texts)
|
||
except Exception as e:
|
||
logger.debug(f"Could not extract items from fragment using JSON structure: {e}")
|
||
|
||
# Strategy 2: If parsing failed, try progressive parsing from the end
|
||
# Look for the last complete JSON structures near the end
|
||
try:
|
||
# Try parsing different lengths from the end
|
||
for length in [3000, 2000, 1000, 500]:
|
||
if len(fragment) > length:
|
||
end_portion = fragment[-length:]
|
||
closed = _closeJsonStructures(end_portion)
|
||
obj, err, _ = tryParseJson(closed)
|
||
if err is None and isinstance(obj, dict):
|
||
# Successfully parsed - extract sections
|
||
sections = extractSectionsFromDocument(obj)
|
||
if sections:
|
||
# Same extraction logic as above
|
||
sorted_sections = sorted(sections, key=lambda s: s.get("order", 0))
|
||
if sorted_sections:
|
||
last_section = sorted_sections[-1]
|
||
elements = last_section.get("elements", [])
|
||
if elements:
|
||
# Extract texts using same logic as Strategy 1
|
||
texts = []
|
||
for elem in elements[-max_items:]:
|
||
if isinstance(elem, dict):
|
||
if "items" in elem and isinstance(elem["items"], list):
|
||
# Get last item from items array
|
||
if elem["items"]:
|
||
last_item = elem["items"][-1]
|
||
if isinstance(last_item, dict) and "text" in last_item:
|
||
texts.append(str(last_item["text"]))
|
||
elif "text" in elem:
|
||
texts.append(str(elem["text"]))
|
||
if texts:
|
||
return ', '.join(texts[-max_items:])
|
||
except Exception as e:
|
||
logger.debug(f"Progressive parsing from end failed: {e}")
|
||
|
||
# Strategy 3: If all parsing fails, try simple extraction from raw fragment
|
||
# Look for last complete {"text": "..."} pattern near the end
|
||
try:
|
||
# Look at last 2000 chars for the pattern
|
||
end_portion = fragment[-2000:] if len(fragment) > 2000 else fragment
|
||
# Find all {"text": "value"} patterns
|
||
import re
|
||
# Pattern to match {"text": "..."} with escaped quotes
|
||
pattern = r'\{"text"\s*:\s*"([^"]+)"\}'
|
||
matches = re.findall(pattern, end_portion)
|
||
if matches:
|
||
# Get last max_items
|
||
last_matches = matches[-max_items:] if len(matches) > max_items else matches
|
||
return ', '.join(last_matches)
|
||
except Exception as e:
|
||
logger.debug(f"Simple pattern extraction failed: {e}")
|
||
|
||
# Strategy 4: If all fails, return empty (will use last_item_from_sections)
|
||
return ""
|
||
|
||
|
||
def buildContinuationContext(allSections: List[Dict[str, Any]], lastRawResponse: Optional[str] = None) -> Dict[str, Any]:
|
||
"""
|
||
Build context information from accumulated sections for continuation prompt.
|
||
Extracts last items and provides clear continuation point.
|
||
|
||
Args:
|
||
allSections: List of sections already generated
|
||
lastRawResponse: Raw JSON response from last iteration (can be broken/incomplete)
|
||
|
||
Returns:
|
||
Dict with section_count, last_raw_json, last_items, and continuation point
|
||
"""
|
||
context = {
|
||
"section_count": len(allSections),
|
||
}
|
||
|
||
# Extract last COMPLETE object directly from raw response (generic - works for any structure)
|
||
# This is extracted BEFORE any merging/accumulation happens
|
||
# Returns the full last complete object like {"text": "..."} or {"code": "...", "language": "..."} etc.
|
||
# Logic: find the last complete {...} where there are no nested { inside (flat object)
|
||
last_complete_object = "" # Full object as JSON string
|
||
total_items_count = 0
|
||
|
||
if lastRawResponse:
|
||
raw_json = stripCodeFences(lastRawResponse.strip())
|
||
if raw_json and raw_json.strip() != "{}":
|
||
# Find last complete flat object (no nested objects inside)
|
||
# Scan from the end backwards to find the last complete {...} object
|
||
# A flat object is complete if: starts with {, ends with }, and has no nested { inside
|
||
|
||
# Work backwards from the end, find last }
|
||
for i in range(len(raw_json) - 1, -1, -1):
|
||
if raw_json[i] == '}':
|
||
# Found a closing brace, work backwards to find its opening brace
|
||
depth = 1
|
||
opening_pos = -1
|
||
|
||
for j in range(i - 1, -1, -1):
|
||
if raw_json[j] == '}':
|
||
depth += 1
|
||
elif raw_json[j] == '{':
|
||
depth -= 1
|
||
if depth == 0:
|
||
# Found matching opening brace
|
||
opening_pos = j
|
||
# Check if this is a flat object (no nested { inside)
|
||
obj_content = raw_json[j + 1:i]
|
||
if '{' not in obj_content:
|
||
# This is a flat object (no nested objects inside)
|
||
last_complete_object = raw_json[j:i + 1]
|
||
break
|
||
|
||
if last_complete_object:
|
||
break
|
||
|
||
# Also try structure-based parsing for item count
|
||
try:
|
||
parsed = repairBrokenJson(raw_json)
|
||
if parsed:
|
||
sections = extractSectionsFromDocument(parsed)
|
||
if sections:
|
||
sorted_sections = sorted(sections, key=lambda s: s.get("order", 0))
|
||
last_section = sorted_sections[-1]
|
||
elements = last_section.get("elements", [])
|
||
|
||
if elements and isinstance(elements, list) and len(elements) > 0:
|
||
if last_section.get("content_type") == "list":
|
||
last_element = elements[-1]
|
||
if isinstance(last_element, dict):
|
||
if "items" in last_element and isinstance(last_element["items"], list):
|
||
items_list = last_element["items"]
|
||
# Only count complete items (those successfully extracted)
|
||
total_items_count = len(items_list)
|
||
except Exception as e:
|
||
logger.debug(f"Could not extract item count from raw response structure: {e}")
|
||
|
||
# Also extract last items for display (fragment extraction)
|
||
last_items_from_fragment = _extractLastItemsFromFragment(raw_json, max_items=10)
|
||
|
||
context["last_raw_json"] = raw_json
|
||
context["last_item_object"] = last_complete_object # Full last complete object (generic - any structure)
|
||
context["last_items_from_fragment"] = last_items_from_fragment
|
||
context["total_items_count"] = total_items_count # Count from raw response
|
||
|
||
logger.debug(f"Included previous JSON response in continuation context ({len(raw_json)} chars, {total_items_count} items in response, last complete object: {last_complete_object})")
|
||
else:
|
||
logger.warning("lastRawResponse was empty or just '{}' - continuation may not work correctly")
|
||
else:
|
||
# No raw response - fallback to extracting from accumulated sections
|
||
# Extract the last complete object from the last element
|
||
last_item_object_from_sections = ""
|
||
if allSections:
|
||
sorted_sections = sorted(allSections, key=lambda s: s.get("order", 0))
|
||
last_section = sorted_sections[-1]
|
||
elements = last_section.get("elements", [])
|
||
|
||
if elements and isinstance(elements, list) and len(elements) > 0:
|
||
# Get the last element (could be any structure - generic)
|
||
last_element = elements[-1]
|
||
if isinstance(last_element, dict):
|
||
# Try to get items if it's a list structure
|
||
if "items" in last_element and isinstance(last_element["items"], list):
|
||
items_list = last_element["items"]
|
||
total_items_count = len(items_list)
|
||
if items_list:
|
||
# Get last item (any structure)
|
||
last_item = items_list[-1]
|
||
if isinstance(last_item, dict):
|
||
# Convert to JSON string (generic - works for any object structure)
|
||
import json
|
||
try:
|
||
last_item_object_from_sections = json.dumps(last_item)
|
||
except:
|
||
pass
|
||
else:
|
||
# Element itself is the object (no items array)
|
||
total_items_count = len(elements)
|
||
# Convert to JSON string (generic)
|
||
import json
|
||
try:
|
||
last_item_object_from_sections = json.dumps(last_element)
|
||
except:
|
||
pass
|
||
|
||
context["last_item_object"] = last_item_object_from_sections
|
||
context["total_items_count"] = total_items_count
|
||
logger.debug(f"No previous raw response available for continuation context (but have {total_items_count} items accumulated, last item object: {last_item_object_from_sections})")
|
||
|
||
return context
|
||
|