gateway/modules/serviceCenter/services/serviceGeneration/subDocumentUtility.py
2026-04-29 23:12:46 +02:00

461 lines
No EOL
18 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
import json
import logging
import os
import re
from typing import Any, Dict
logger = logging.getLogger(__name__)
def _parseInlineRuns(text: str) -> list:
"""
Parse inline markdown formatting into a list of InlineRun dicts.
Handles: images, links, bold, italic, inline code, plain text.
Uses a regex-based tokenizer that processes tokens left-to-right.
"""
if not text:
return [{"type": "text", "value": ""}]
# Pattern order matters: images before links, bold before italic
_TOKEN_RE = re.compile(
r'!\[(?P<imgAlt>[^\]]*)\]\((?P<imgSrc>[^)"]+)(?:\s+"(?P<imgWidth>\d+)pt")?\)' # image
r'|\[(?P<linkText>[^\]]+)\]\((?P<linkHref>[^)]+)\)' # link
r'|`(?P<code>[^`]+)`' # inline code
r'|\*\*(?P<bold>.+?)\*\*' # bold
r'|(?<!\w)\*(?P<italic1>.+?)\*(?!\w)' # italic *x*
r'|(?<!\w)_(?P<italic2>.+?)_(?!\w)' # italic _x_
)
runs = []
lastEnd = 0
for m in _TOKEN_RE.finditer(text):
# Plain text before this match
if m.start() > lastEnd:
runs.append({"type": "text", "value": text[lastEnd:m.start()]})
if m.group("imgAlt") is not None or m.group("imgSrc") is not None:
alt = (m.group("imgAlt") or "").strip() or "Image"
src = (m.group("imgSrc") or "").strip()
widthStr = m.group("imgWidth")
run = {"type": "image", "value": alt}
if src.startswith("file:"):
run["fileId"] = src[5:]
else:
run["href"] = src
if widthStr:
run["widthPt"] = int(widthStr)
runs.append(run)
elif m.group("linkText") is not None:
runs.append({"type": "link", "value": m.group("linkText"), "href": m.group("linkHref")})
elif m.group("code") is not None:
runs.append({"type": "code", "value": m.group("code")})
elif m.group("bold") is not None:
runs.append({"type": "bold", "value": m.group("bold")})
elif m.group("italic1") is not None:
runs.append({"type": "italic", "value": m.group("italic1")})
elif m.group("italic2") is not None:
runs.append({"type": "italic", "value": m.group("italic2")})
lastEnd = m.end()
# Trailing plain text
if lastEnd < len(text):
runs.append({"type": "text", "value": text[lastEnd:]})
return runs if runs else [{"type": "text", "value": text}]
def markdownToDocumentJson(markdown: str, title: str, language: str = "de") -> Dict[str, Any]:
"""
Convert markdown content to the standard document JSON format with Inline-Run model.
Sections use inlineRuns (list of run dicts) instead of plain text strings.
Supports headings, code blocks, tables, lists, images, paragraphs.
"""
if not isinstance(markdown, str):
markdown = str(markdown) if markdown else ""
sections = []
order = 0
lines = markdown.split("\n")
i = 0
def _nextId():
nonlocal order
order += 1
return f"s_{order}"
while i < len(lines):
line = lines[i]
# Headings (plain text, no inline formatting)
headingMatch = re.match(r"^(#{1,6})\s+(.+)", line)
if headingMatch:
level = len(headingMatch.group(1))
text = headingMatch.group(2).strip()
sections.append({
"id": _nextId(), "content_type": "heading", "order": order,
"elements": [{"content": {"text": text, "level": level}}],
})
i += 1
continue
# Fenced code blocks (no inline formatting)
codeMatch = re.match(r"^```(\w*)", line)
if codeMatch:
lang = codeMatch.group(1) or "text"
codeLines = []
i += 1
while i < len(lines) and not lines[i].startswith("```"):
codeLines.append(lines[i])
i += 1
i += 1
sections.append({
"id": _nextId(), "content_type": "code_block", "order": order,
"elements": [{"content": {"code": "\n".join(codeLines), "language": lang}}],
})
continue
# Tables - cells are List[InlineRun]
tableMatch = re.match(r"^\|(.+)\|$", line)
if tableMatch and (i + 1) < len(lines) and re.match(r"^\|[\s\-:|]+\|$", lines[i + 1]):
headerCells = [_parseInlineRuns(c.strip()) for c in tableMatch.group(1).split("|")]
i += 2
rows = []
while i < len(lines) and re.match(r"^\|(.+)\|$", lines[i]):
rowCells = [_parseInlineRuns(c.strip()) for c in lines[i][1:-1].split("|")]
rows.append(rowCells)
i += 1
sections.append({
"id": _nextId(), "content_type": "table", "order": order,
"elements": [{"content": {"headers": headerCells, "rows": rows}}],
})
continue
# Bullet / numbered lists - items are List[List[InlineRun]]
listMatch = re.match(r"^(\s*)([-*+]|\d+[.)]) (.+)", line)
if listMatch:
isNumbered = bool(re.match(r"\d+[.)]", listMatch.group(2)))
items = []
while i < len(lines) and re.match(r"^(\s*)([-*+]|\d+[.)]) (.+)", lines[i]):
m = re.match(r"^(\s*)([-*+]|\d+[.)]) (.+)", lines[i])
items.append(_parseInlineRuns(m.group(3).strip()))
i += 1
sections.append({
"id": _nextId(), "content_type": "bullet_list", "order": order,
"elements": [{"content": {"items": items, "list_type": "numbered" if isNumbered else "bullet"}}],
})
continue
# Empty lines
if not line.strip():
i += 1
continue
# Standalone image on its own line -> block-level image section
imgMatch = re.match(r"^!\[([^\]]*)\]\(([^)\"]+)(?:\s+\"(\d+)pt\")?\)\s*$", line)
if imgMatch:
altText = imgMatch.group(1).strip() or "Image"
src = imgMatch.group(2).strip()
widthStr = imgMatch.group(3)
fileId = src[5:] if src.startswith("file:") else ""
content = {
"altText": altText,
"base64Data": "",
"_fileRef": fileId,
"_srcUrl": src if not fileId else "",
}
if widthStr:
content["widthPt"] = int(widthStr)
sections.append({
"id": _nextId(), "content_type": "image", "order": order,
"elements": [{"content": content}],
})
i += 1
continue
# Paragraph - produces inlineRuns
paraLines = []
while i < len(lines) and lines[i].strip() and not re.match(
r"^(#{1,6}\s|```|\|.+\||!\[[^\]]*\]\([^)]+\)\s*$|(\s*)([-*+]|\d+[.)]) )", lines[i]
):
paraLines.append(lines[i])
i += 1
if paraLines:
combinedText = " ".join(paraLines)
sections.append({
"id": _nextId(), "content_type": "paragraph", "order": order,
"elements": [{"content": {"inlineRuns": _parseInlineRuns(combinedText)}}],
})
continue
i += 1
if not sections:
fallbackText = markdown.strip() or "(empty)"
sections.append({
"id": _nextId(), "content_type": "paragraph", "order": order,
"elements": [{"content": {"inlineRuns": _parseInlineRuns(fallbackText)}}],
})
return {
"metadata": {
"split_strategy": "single_document",
"source_documents": [],
"extraction_method": "file_create_rendering",
"title": title,
"language": language,
},
"documents": [{
"id": "doc_1",
"title": title,
"sections": sections,
}],
}
def getFileExtension(fileName: str) -> str:
"""Extract file extension from fileName (without dot, lowercased)."""
if '.' in fileName:
return fileName.rsplit('.', 1)[-1].lower()
return ''
def getMimeTypeFromExtension(extension: str) -> str:
"""
Get MIME type based on file extension.
This method consolidates MIME type detection from extension.
Args:
extension: File extension (with or without dot)
Returns:
str: MIME type for the extension
"""
# Normalize extension (remove dot if present)
if extension.startswith('.'):
extension = extension[1:]
# Map extensions to MIME types
mime_types = {
'txt': 'text/plain',
'json': 'application/json',
'xml': 'application/xml',
'csv': 'text/csv',
'html': 'text/html',
'htm': 'text/html',
'md': 'text/markdown',
'py': 'text/x-python',
'js': 'application/javascript',
'css': 'text/css',
'pdf': 'application/pdf',
'doc': 'application/msword',
'docx': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document',
'xls': 'application/vnd.ms-excel',
'xlsx': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
'ppt': 'application/vnd.ms-powerpoint',
'pptx': 'application/vnd.openxmlformats-officedocument.presentationml.presentation',
'svg': 'image/svg+xml',
'jpg': 'image/jpeg',
'jpeg': 'image/jpeg',
'png': 'image/png',
'gif': 'image/gif',
'bmp': 'image/bmp',
'webp': 'image/webp',
'zip': 'application/zip',
'rar': 'application/x-rar-compressed',
'7z': 'application/x-7z-compressed',
'tar': 'application/x-tar',
'gz': 'application/gzip'
}
return mime_types.get(extension.lower(), 'application/octet-stream')
def detectContentTypeFromData(fileData: bytes, fileName: str) -> str:
"""
Detect content type from file data and fileName.
This method makes the MIME type detection function accessible through the service center.
Args:
fileData: Raw file data as bytes
fileName: Name of the file
Returns:
str: Detected MIME type
"""
try:
# Check file extension first
ext = os.path.splitext(fileName)[1].lower()
if ext:
# Map common extensions to MIME types
extToMime = {
'.txt': 'text/plain',
'.md': 'text/markdown',
'.csv': 'text/csv',
'.json': 'application/json',
'.xml': 'application/xml',
'.js': 'application/javascript',
'.py': 'application/x-python',
'.svg': 'image/svg+xml',
'.jpg': 'image/jpeg',
'.jpeg': 'image/jpeg',
'.png': 'image/png',
'.gif': 'image/gif',
'.bmp': 'image/bmp',
'.webp': 'image/webp',
'.pdf': 'application/pdf',
'.docx': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document',
'.doc': 'application/msword',
'.xlsx': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
'.xls': 'application/vnd.ms-excel',
'.pptx': 'application/vnd.openxmlformats-officedocument.presentationml.presentation',
'.ppt': 'application/vnd.ms-powerpoint',
'.html': 'text/html',
'.htm': 'text/html',
'.css': 'text/css',
'.zip': 'application/zip',
'.rar': 'application/x-rar-compressed',
'.7z': 'application/x-7z-compressed',
'.tar': 'application/x-tar',
'.gz': 'application/gzip'
}
if ext in extToMime:
return extToMime[ext]
# Try to detect from content
if fileData.startswith(b'%PDF'):
return 'application/pdf'
elif fileData.startswith(b'PK\x03\x04'):
# ZIP-based formats (docx, xlsx, pptx)
return 'application/zip'
elif fileData.startswith(b'<'):
# XML-based formats
try:
text = fileData.decode('utf-8', errors='ignore')
if '<svg' in text.lower():
return 'image/svg+xml'
elif '<html' in text.lower():
return 'text/html'
else:
return 'application/xml'
except:
pass
elif fileData.startswith(b'\x89PNG\r\n\x1a\n'):
return 'image/png'
elif fileData.startswith(b'\xff\xd8\xff'):
return 'image/jpeg'
elif fileData.startswith(b'GIF87a') or fileData.startswith(b'GIF89a'):
return 'image/gif'
elif fileData.startswith(b'BM'):
return 'image/bmp'
elif fileData.startswith(b'RIFF') and fileData[8:12] == b'WEBP':
return 'image/webp'
return 'application/octet-stream'
except Exception as e:
logger.error(f"Error detecting content type from data: {str(e)}")
return 'application/octet-stream'
def detectMimeTypeFromData(file_bytes: bytes, fileName: str, service=None) -> str:
"""Detect MIME type from file bytes and fileName using a service if provided."""
try:
if service and hasattr(service, 'detectContentTypeFromData'):
detected = service.detectContentTypeFromData(file_bytes, fileName)
if detected and detected != 'application/octet-stream':
return detected
# Fallback: use our consolidated function
return detectContentTypeFromData(file_bytes, fileName)
except Exception as e:
logger.warning(f"Error in MIME type detection for {fileName}: {str(e)}")
return 'application/octet-stream'
def detectMimeTypeFromContent(content: Any, fileName: str, service=None) -> str:
"""Detect MIME type from content and fileName using a service if provided."""
try:
if isinstance(content, str):
file_bytes = content.encode('utf-8')
elif isinstance(content, dict):
file_bytes = json.dumps(content, ensure_ascii=False).encode('utf-8')
else:
file_bytes = str(content).encode('utf-8')
return detectMimeTypeFromData(file_bytes, fileName, service)
except Exception as e:
logger.warning(f"Error in MIME type detection for {fileName}: {str(e)}")
return 'application/octet-stream'
def convertDocumentDataToString(document_data: Any, file_extension: str) -> str:
"""Convert document data to string content based on file type with enhanced processing."""
try:
if document_data is None:
return ""
if isinstance(document_data, bytes):
# WICHTIG: Decode bytes to string for text files (HTML, text, etc.)
try:
return document_data.decode('utf-8')
except UnicodeDecodeError:
# Fallback: try latin1 or return with error replacement
try:
return document_data.decode('latin1')
except Exception:
return document_data.decode('utf-8', errors='replace')
if isinstance(document_data, str):
return document_data
if isinstance(document_data, dict):
if file_extension == 'json':
return json.dumps(document_data, indent=2, ensure_ascii=False)
elif file_extension in ['txt', 'md', 'html', 'css', 'js', 'py']:
text_fields = ['content', 'text', 'data', 'result', 'summary', 'extracted_content', 'table_data']
for field in text_fields:
if field in document_data:
content = document_data[field]
if isinstance(content, str):
return content
elif isinstance(content, (dict, list)):
return json.dumps(content, indent=2, ensure_ascii=False)
return json.dumps(document_data, indent=2, ensure_ascii=False)
elif file_extension == 'csv':
csv_fields = ['table_data', 'csv_data', 'rows', 'data', 'content', 'text']
for field in csv_fields:
if field in document_data:
content = document_data[field]
if isinstance(content, str):
return content
elif isinstance(content, list):
if content and isinstance(content[0], (list, dict)):
import csv
import io
output = io.StringIO()
if isinstance(content[0], dict):
if content:
fieldnames = content[0].keys()
writer = csv.DictWriter(output, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(content)
else:
writer = csv.writer(output)
writer.writerows(content)
return output.getvalue()
return json.dumps(document_data, indent=2, ensure_ascii=False)
else:
return json.dumps(document_data, indent=2, ensure_ascii=False)
elif isinstance(document_data, list):
if file_extension == 'csv':
import csv
import io
output = io.StringIO()
if document_data and isinstance(document_data[0], dict):
fieldnames = document_data[0].keys()
writer = csv.DictWriter(output, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(document_data)
else:
writer = csv.writer(output)
writer.writerows(document_data)
return output.getvalue()
else:
return json.dumps(document_data, indent=2, ensure_ascii=False)
else:
return str(document_data)
except Exception as e:
logger.error(f"Error converting document data to string: {str(e)}")
return str(document_data)