137 lines
4.2 KiB
Python
137 lines
4.2 KiB
Python
import json
|
||
import logging
|
||
from typing import Any, Dict, List, Optional, Tuple, Union
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
def stripCodeFences(text: str) -> str:
|
||
"""Remove ```json / ``` fences and surrounding whitespace if present."""
|
||
if not text:
|
||
return text
|
||
s = text.strip()
|
||
if s.startswith("```") and s.endswith("```"):
|
||
# Remove first/last triple backticks
|
||
# Commonly starts with ```json\n
|
||
# Strip opening backticks
|
||
i = 3
|
||
# Skip optional language tag like 'json'
|
||
while i < len(s) and s[i] != '\n':
|
||
i += 1
|
||
if i < len(s) and s[i] == '\n':
|
||
s = s[i+1:]
|
||
# Strip trailing ```
|
||
if s.endswith("```"):
|
||
s = s[:-3]
|
||
return s.strip()
|
||
return s
|
||
|
||
|
||
def extractFirstBalancedJson(text: str) -> str:
|
||
"""Return the first balanced JSON object/array substring; otherwise return trimmed input."""
|
||
if not text:
|
||
return text
|
||
s = text.strip()
|
||
# Find first '{' or '['
|
||
brace = s.find('{')
|
||
bracket = s.find('[')
|
||
start = -1
|
||
if brace != -1 and (bracket == -1 or brace < bracket):
|
||
start = brace
|
||
elif bracket != -1:
|
||
start = bracket
|
||
if start == -1:
|
||
return s
|
||
# Scan for matching close using a simple stack
|
||
stack: List[str] = []
|
||
for i in range(start, len(s)):
|
||
ch = s[i]
|
||
if ch in '{[':
|
||
stack.append(ch)
|
||
elif ch in '}]':
|
||
if not stack:
|
||
continue
|
||
opener = stack.pop()
|
||
if (opener == '{' and ch != '}') or (opener == '[' and ch != ']'):
|
||
continue
|
||
if not stack:
|
||
return s[start:i+1].strip()
|
||
return s
|
||
|
||
|
||
def normalizeJsonText(text: str) -> str:
|
||
"""Light normalization: remove BOM, normalize smart quotes."""
|
||
if not text:
|
||
return text
|
||
s = text
|
||
# Remove UTF-8 BOM if present
|
||
if s.startswith('\ufeff'):
|
||
s = s.lstrip('\ufeff')
|
||
# Normalize smart quotes to straight quotes
|
||
s = s.replace('“', '"').replace('”', '"').replace('’', "'").replace('‘', "'")
|
||
return s
|
||
|
||
|
||
def extractJsonString(text: str) -> str:
|
||
"""Strip code fences, normalize, then extract first balanced JSON substring."""
|
||
s = normalizeJsonText(text)
|
||
s = stripCodeFences(s)
|
||
s = extractFirstBalancedJson(s)
|
||
return s.strip()
|
||
|
||
|
||
def tryParseJson(text: Union[str, bytes]) -> Tuple[Optional[Union[Dict, List]], Optional[Exception], str]:
|
||
"""Extract and parse JSON; return (obj, error, cleaned_str)."""
|
||
if isinstance(text, bytes):
|
||
try:
|
||
text = text.decode('utf-8', errors='replace')
|
||
except Exception:
|
||
text = str(text)
|
||
cleaned = extractJsonString(text or "")
|
||
try:
|
||
return json.loads(cleaned), None, cleaned
|
||
except Exception as e:
|
||
return None, e, cleaned
|
||
|
||
|
||
def parseJsonOrRaise(text: Union[str, bytes]) -> Union[Dict, List]:
|
||
obj, err, cleaned = tryParseJson(text)
|
||
if err is not None:
|
||
logger.error(f"parse_json_or_raise failed: {err}. Cleaned preview: {cleaned[:200]}...")
|
||
raise err
|
||
return obj
|
||
|
||
|
||
def mergeRootLists(json_parts: List[Union[str, Dict, List]]) -> Dict[str, Any]:
|
||
"""
|
||
Generic merger for root-level lists: take first dict as base; for each subsequent part:
|
||
- if value is list and same key exists as list, extend it
|
||
- if key absent, add it
|
||
- for non-list keys, keep the original (from the first part)
|
||
Sets continuation=None if present in base.
|
||
"""
|
||
base: Optional[Dict[str, Any]] = None
|
||
parsed: List[Dict[str, Any]] = []
|
||
for part in json_parts:
|
||
if isinstance(part, (dict, list)):
|
||
obj = part
|
||
else:
|
||
obj, err, _ = tryParseJson(part)
|
||
if err is not None or not isinstance(obj, (dict, list)):
|
||
continue
|
||
if isinstance(obj, dict):
|
||
parsed.append(obj)
|
||
if not parsed:
|
||
return {}
|
||
base = dict(parsed[0])
|
||
for obj in parsed[1:]:
|
||
for k, v in obj.items():
|
||
if isinstance(v, list) and isinstance(base.get(k), list):
|
||
base[k].extend(v)
|
||
elif k not in base:
|
||
base[k] = v
|
||
if 'continuation' in base:
|
||
base['continuation'] = None
|
||
return base
|
||
|
||
|