gateway/modules/features/codeeditor/responseParser.py
2026-02-23 18:35:36 +01:00

139 lines
4.3 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""Response parser for the CodeEditor feature.
Parses AI responses into typed segments (text, code_block, file_edit)."""
import logging
import re
from typing import List, Optional
from modules.features.codeeditor.datamodelCodeeditor import ResponseSegment, SegmentTypeEnum
logger = logging.getLogger(__name__)
_FENCE_PATTERN = re.compile(r"^```(\w*)\s*$", re.MULTILINE)
def parseResponse(rawContent: str) -> List[ResponseSegment]:
"""Parse an AI response into typed segments."""
if not rawContent or not rawContent.strip():
return []
segments = []
lines = rawContent.split("\n")
i = 0
textBuffer = []
while i < len(lines):
line = lines[i]
match = _FENCE_PATTERN.match(line)
if match:
if textBuffer:
_flushTextBuffer(textBuffer, segments)
textBuffer = []
lang = match.group(1).strip()
blockLines, endIdx = _collectBlock(lines, i + 1)
blockContent = "\n".join(blockLines)
if lang == "file_edit":
segment = _parseFileEditBlock(blockContent)
if segment:
segments.append(segment)
else:
segments.append(ResponseSegment(
type=SegmentTypeEnum.CODE_BLOCK,
content=blockContent,
language="text"
))
else:
segments.append(ResponseSegment(
type=SegmentTypeEnum.CODE_BLOCK,
content=blockContent,
language=lang or "text"
))
i = endIdx + 1
else:
textBuffer.append(line)
i += 1
if textBuffer:
_flushTextBuffer(textBuffer, segments)
return segments
def _collectBlock(lines: List[str], startIdx: int) -> tuple:
"""Collect lines inside a fenced code block until closing ```."""
blockLines = []
idx = startIdx
while idx < len(lines):
if lines[idx].strip() == "```":
return blockLines, idx
blockLines.append(lines[idx])
idx += 1
return blockLines, idx
def _flushTextBuffer(buffer: List[str], segments: List[ResponseSegment]):
"""Flush accumulated text lines into a text segment."""
text = "\n".join(buffer).strip()
buffer.clear()
if text:
segments.append(ResponseSegment(
type=SegmentTypeEnum.TEXT,
content=text
))
def _parseFileEditBlock(blockContent: str) -> Optional[ResponseSegment]:
"""Parse a file_edit block into a ResponseSegment with fileName, oldContent, newContent."""
fields = {"fileName": None, "oldContent": None, "newContent": None}
currentField = None
currentLines = []
for line in blockContent.split("\n"):
stripped = line.strip()
newField = None
for key in ("fileName", "oldContent", "newContent"):
if stripped.startswith(f"{key}:"):
newField = key
break
if newField:
if currentField and currentLines:
fields[currentField] = "\n".join(currentLines)
currentField = newField
value = stripped[len(f"{newField}:"):].strip()
if newField == "fileName":
fields["fileName"] = value if value else None
currentField = None
currentLines = []
else:
currentLines = [value] if value and value != "|" else []
else:
if currentField in ("oldContent", "newContent"):
dedented = line[2:] if line.startswith(" ") else line
currentLines.append(dedented)
if currentField and currentLines:
fields[currentField] = "\n".join(currentLines)
if not fields["fileName"]:
logger.warning("file_edit block missing fileName")
return None
if fields["newContent"] is None:
logger.warning(f"file_edit block for {fields['fileName']} missing newContent")
return None
return ResponseSegment(
type=SegmentTypeEnum.FILE_EDIT,
content=f"Edit: {fields['fileName']}",
fileName=fields["fileName"],
oldContent=fields["oldContent"],
newContent=fields["newContent"]
)