gateway/modules/shared/jsonUtils.py
2025-10-20 12:22:01 +02:00

137 lines
4.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import logging
from typing import Any, Dict, List, Optional, Tuple, Union
logger = logging.getLogger(__name__)
def stripCodeFences(text: str) -> str:
"""Remove ```json / ``` fences and surrounding whitespace if present."""
if not text:
return text
s = text.strip()
if s.startswith("```") and s.endswith("```"):
# Remove first/last triple backticks
# Commonly starts with ```json\n
# Strip opening backticks
i = 3
# Skip optional language tag like 'json'
while i < len(s) and s[i] != '\n':
i += 1
if i < len(s) and s[i] == '\n':
s = s[i+1:]
# Strip trailing ```
if s.endswith("```"):
s = s[:-3]
return s.strip()
return s
def extractFirstBalancedJson(text: str) -> str:
"""Return the first balanced JSON object/array substring; otherwise return trimmed input."""
if not text:
return text
s = text.strip()
# Find first '{' or '['
brace = s.find('{')
bracket = s.find('[')
start = -1
if brace != -1 and (bracket == -1 or brace < bracket):
start = brace
elif bracket != -1:
start = bracket
if start == -1:
return s
# Scan for matching close using a simple stack
stack: List[str] = []
for i in range(start, len(s)):
ch = s[i]
if ch in '{[':
stack.append(ch)
elif ch in '}]':
if not stack:
continue
opener = stack.pop()
if (opener == '{' and ch != '}') or (opener == '[' and ch != ']'):
continue
if not stack:
return s[start:i+1].strip()
return s
def normalizeJsonText(text: str) -> str:
"""Light normalization: remove BOM, normalize smart quotes."""
if not text:
return text
s = text
# Remove UTF-8 BOM if present
if s.startswith('\ufeff'):
s = s.lstrip('\ufeff')
# Normalize smart quotes to straight quotes
s = s.replace('', '"').replace('', '"').replace('', "'").replace('', "'")
return s
def extractJsonString(text: str) -> str:
"""Strip code fences, normalize, then extract first balanced JSON substring."""
s = normalizeJsonText(text)
s = stripCodeFences(s)
s = extractFirstBalancedJson(s)
return s.strip()
def tryParseJson(text: Union[str, bytes]) -> Tuple[Optional[Union[Dict, List]], Optional[Exception], str]:
"""Extract and parse JSON; return (obj, error, cleaned_str)."""
if isinstance(text, bytes):
try:
text = text.decode('utf-8', errors='replace')
except Exception:
text = str(text)
cleaned = extractJsonString(text or "")
try:
return json.loads(cleaned), None, cleaned
except Exception as e:
return None, e, cleaned
def parseJsonOrRaise(text: Union[str, bytes]) -> Union[Dict, List]:
obj, err, cleaned = tryParseJson(text)
if err is not None:
logger.error(f"parse_json_or_raise failed: {err}. Cleaned preview: {cleaned[:200]}...")
raise err
return obj
def mergeRootLists(json_parts: List[Union[str, Dict, List]]) -> Dict[str, Any]:
"""
Generic merger for root-level lists: take first dict as base; for each subsequent part:
- if value is list and same key exists as list, extend it
- if key absent, add it
- for non-list keys, keep the original (from the first part)
Sets continuation=None if present in base.
"""
base: Optional[Dict[str, Any]] = None
parsed: List[Dict[str, Any]] = []
for part in json_parts:
if isinstance(part, (dict, list)):
obj = part
else:
obj, err, _ = tryParseJson(part)
if err is not None or not isinstance(obj, (dict, list)):
continue
if isinstance(obj, dict):
parsed.append(obj)
if not parsed:
return {}
base = dict(parsed[0])
for obj in parsed[1:]:
for k, v in obj.items():
if isinstance(v, list) and isinstance(base.get(k), list):
base[k].extend(v)
elif k not in base:
base[k] = v
if 'continuation' in base:
base['continuation'] = None
return base