gateway/modules/shared/jsonUtils.py
2025-10-29 00:38:57 +01:00

356 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import logging
from typing import Any, Dict, List, Optional, Tuple, Union
logger = logging.getLogger(__name__)
def stripCodeFences(text: str) -> str:
"""Remove ```json / ``` fences and surrounding whitespace if present."""
if not text:
return text
s = text.strip()
if s.startswith("```") and s.endswith("```"):
# Remove first/last triple backticks
# Commonly starts with ```json\n
# Strip opening backticks
i = 3
# Skip optional language tag like 'json'
while i < len(s) and s[i] != '\n':
i += 1
if i < len(s) and s[i] == '\n':
s = s[i+1:]
# Strip trailing ```
if s.endswith("```"):
s = s[:-3]
return s.strip()
return s
def extractFirstBalancedJson(text: str) -> str:
"""Return the first balanced JSON object/array substring; otherwise return trimmed input."""
if not text:
return text
s = text.strip()
# Find first '{' or '['
brace = s.find('{')
bracket = s.find('[')
start = -1
if brace != -1 and (bracket == -1 or brace < bracket):
start = brace
elif bracket != -1:
start = bracket
if start == -1:
return s
# Scan for matching close using a simple stack
stack: List[str] = []
for i in range(start, len(s)):
ch = s[i]
if ch in '{[':
stack.append(ch)
elif ch in '}]':
if not stack:
continue
opener = stack.pop()
if (opener == '{' and ch != '}') or (opener == '[' and ch != ']'):
continue
if not stack:
return s[start:i+1].strip()
return s
def normalizeJsonText(text: str) -> str:
"""Light normalization: remove BOM, normalize smart quotes."""
if not text:
return text
s = text
# Remove UTF-8 BOM if present
if s.startswith('\ufeff'):
s = s.lstrip('\ufeff')
# Normalize smart quotes to straight quotes
s = s.replace('', '"').replace('', '"').replace('', "'").replace('', "'")
return s
def extractJsonString(text: str) -> str:
"""Strip code fences, normalize, then extract first balanced JSON substring."""
s = normalizeJsonText(text)
s = stripCodeFences(s)
s = extractFirstBalancedJson(s)
return s.strip()
def tryParseJson(text: Union[str, bytes]) -> Tuple[Optional[Union[Dict, List]], Optional[Exception], str]:
"""Extract and parse JSON; return (obj, error, cleaned_str)."""
if isinstance(text, bytes):
try:
text = text.decode('utf-8', errors='replace')
except Exception:
text = str(text)
cleaned = extractJsonString(text or "")
try:
return json.loads(cleaned), None, cleaned
except Exception as e:
return None, e, cleaned
def parseJsonOrRaise(text: Union[str, bytes]) -> Union[Dict, List]:
obj, err, cleaned = tryParseJson(text)
if err is not None:
logger.error(f"parse_json_or_raise failed: {err}. Cleaned preview: {cleaned[:200]}...")
raise err
return obj
def mergeRootLists(json_parts: List[Union[str, Dict, List]]) -> Dict[str, Any]:
"""
Generic merger for root-level lists: take first dict as base; for each subsequent part:
- if value is list and same key exists as list, extend it
- if key absent, add it
- for non-list keys, keep the original (from the first part)
Sets continuation=None if present in base.
"""
base: Optional[Dict[str, Any]] = None
parsed: List[Dict[str, Any]] = []
for part in json_parts:
if isinstance(part, (dict, list)):
obj = part
else:
obj, err, _ = tryParseJson(part)
if err is not None or not isinstance(obj, (dict, list)):
continue
if isinstance(obj, dict):
parsed.append(obj)
if not parsed:
return {}
base = dict(parsed[0])
for obj in parsed[1:]:
for k, v in obj.items():
if isinstance(v, list) and isinstance(base.get(k), list):
base[k].extend(v)
elif k not in base:
base[k] = v
if 'continuation' in base:
base['continuation'] = None
return base
def repairBrokenJson(text: str) -> Optional[Dict[str, Any]]:
"""
Attempt to repair broken JSON using multiple strategies.
Returns the best repair attempt or None if all fail.
"""
if not text:
return None
# Strategy 1: Progressive parsing - try to find longest valid prefix
best_result = None
best_valid_length = 0
for i in range(len(text), 0, -1):
test_str = text[:i]
closed_str = _closeJsonStructures(test_str)
obj, err, _ = tryParseJson(closed_str)
if err is None and isinstance(obj, dict):
best_result = obj
best_valid_length = i
logger.debug(f"Progressive parsing success at length {i}")
break
if best_result:
logger.info(f"Repaired JSON using progressive parsing (valid length: {best_valid_length})")
return best_result
# Strategy 2: Structure closing - close incomplete structures
closed_str = _closeJsonStructures(text)
obj, err, _ = tryParseJson(closed_str)
if err is None and isinstance(obj, dict):
logger.info("Repaired JSON using structure closing")
return obj
# Strategy 3: Regex extraction (fallback for completely broken JSON)
extracted = _extractSectionsRegex(text)
if extracted:
logger.info("Repaired JSON using regex extraction")
return {"documents": [{"sections": extracted}]}
logger.warning("All repair strategies failed")
return None
def _closeJsonStructures(text: str) -> str:
"""
Close incomplete JSON structures by adding missing closing brackets.
"""
if not text:
return text
# Count open/close brackets and braces
open_braces = text.count('{')
close_braces = text.count('}')
open_brackets = text.count('[')
close_brackets = text.count(']')
# Close incomplete structures
result = text
for _ in range(open_braces - close_braces):
result += '}'
for _ in range(open_brackets - close_brackets):
result += ']'
return result
def _extractSectionsRegex(text: str) -> List[Dict[str, Any]]:
"""
Extract sections from broken JSON using regex patterns.
Fallback strategy when JSON is completely corrupted.
"""
import re
sections = []
# Pattern to find section objects
section_pattern = r'"id"\s*:\s*"(section_\d+)"\s*,?\s*"content_type"\s*:\s*"(\w+)"\s*,?\s*"order"\s*:\s*(\d+)'
for match in re.finditer(section_pattern, text, re.IGNORECASE):
section_id = match.group(1)
content_type = match.group(2)
order = int(match.group(3))
# Try to extract elements array
elements_match = re.search(
r'"elements"\s*:\s*\[(.*?)\]',
text[match.end():match.end()+500] # Look ahead for elements
)
elements = []
if elements_match:
try:
elements_str = '[' + elements_match.group(1) + ']'
elements = json.loads(elements_str)
except:
pass
sections.append({
"id": section_id,
"content_type": content_type,
"elements": elements,
"order": order
})
return sections
def extractSectionsFromDocument(documentData: Dict[str, Any]) -> List[Dict[str, Any]]:
"""
Extract all sections from document data structure.
Handles both flat and nested document structures.
"""
if not isinstance(documentData, dict):
return []
# Try to extract sections from documents array
if "documents" in documentData:
all_sections = []
for doc in documentData.get("documents", []):
if isinstance(doc, dict) and "sections" in doc:
sections = doc.get("sections", [])
if isinstance(sections, list):
all_sections.extend(sections)
return all_sections
# Try to extract sections directly from root
if "sections" in documentData:
sections = documentData.get("sections", [])
if isinstance(sections, list):
return sections
return []
def extractContentSample(section: Dict[str, Any]) -> str:
"""
Extract a sample of content from a section for continuation context.
Returns a string describing the last content for context.
"""
if not isinstance(section, dict):
return ""
content_type = section.get("content_type", "").lower()
elements = section.get("elements", [])
if not elements or not isinstance(elements, list):
return "Content exists"
# Get last elements for sampling
sample_elements = elements[-5:] if len(elements) > 5 else elements
if content_type == "list":
# Extract last few list items
items_text = []
for elem in sample_elements:
if isinstance(elem, dict) and "text" in elem:
items_text.append(elem.get("text", ""))
if items_text:
return f"Last {len(items_text)} items: {', '.join(items_text[:3])}"
elif content_type == "paragraph":
# Extract text and take last 150 chars
for elem in sample_elements:
if isinstance(elem, dict) and "text" in elem:
text = elem.get("text", "")
if len(text) > 150:
text = "..." + text[-150:]
return f"Last content: {text}"
elif content_type == "code":
# Extract last few lines
for elem in sample_elements:
if isinstance(elem, dict) and "code" in elem:
code = elem.get("code", "")
lines = code.split('\n')
if len(lines) > 5:
return f"Last lines ({len(lines)} total): {', '.join(lines[-3:])}"
return f"Code ({len(lines)} lines)"
elif content_type == "table":
# Extract last rows
for elem in sample_elements:
if isinstance(elem, dict) and "rows" in elem:
rows = elem.get("rows", [])
return f"Table with {len(rows)} rows"
return "Content exists"
def buildContinuationContext(allSections: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
Build context information from accumulated sections for continuation prompt.
Returns dict with metadata about what was already generated.
"""
if not allSections:
return {
"section_count": 0,
"next_order": 1,
"last_content_sample": "No content yet"
}
# Sort sections by order
sorted_sections = sorted(allSections, key=lambda s: s.get("order", 0))
last_section = sorted_sections[-1]
last_order = last_section.get("order", 0)
# Get content sample from last section
last_content_sample = extractContentSample(last_section)
return {
"section_count": len(allSections),
"last_section_id": last_section.get("id", ""),
"last_order": last_order,
"next_order": last_order + 1,
"last_content_type": last_section.get("content_type", ""),
"last_content_sample": last_content_sample
}