# Copyright (c) 2025 Patrick Motsch # All rights reserved. import json import logging import os import re from typing import Any, Dict logger = logging.getLogger(__name__) def _parseInlineRuns(text: str) -> list: """ Parse inline markdown formatting into a list of InlineRun dicts. Handles: images, links, bold, italic, inline code, plain text. Uses a regex-based tokenizer that processes tokens left-to-right. """ if not text: return [{"type": "text", "value": ""}] # Pattern order matters: images before links, bold before italic _TOKEN_RE = re.compile( r'!\[(?P[^\]]*)\]\((?P[^)"]+)(?:\s+"(?P\d+)pt")?\)' # image r'|\[(?P[^\]]+)\]\((?P[^)]+)\)' # link r'|`(?P[^`]+)`' # inline code r'|\*\*(?P.+?)\*\*' # bold r'|(?.+?)\*(?!\w)' # italic *x* r'|(?.+?)_(?!\w)' # italic _x_ ) runs = [] lastEnd = 0 for m in _TOKEN_RE.finditer(text): # Plain text before this match if m.start() > lastEnd: runs.append({"type": "text", "value": text[lastEnd:m.start()]}) if m.group("imgAlt") is not None or m.group("imgSrc") is not None: alt = (m.group("imgAlt") or "").strip() or "Image" src = (m.group("imgSrc") or "").strip() widthStr = m.group("imgWidth") run = {"type": "image", "value": alt} if src.startswith("file:"): run["fileId"] = src[5:] else: run["href"] = src if widthStr: run["widthPt"] = int(widthStr) runs.append(run) elif m.group("linkText") is not None: runs.append({"type": "link", "value": m.group("linkText"), "href": m.group("linkHref")}) elif m.group("code") is not None: runs.append({"type": "code", "value": m.group("code")}) elif m.group("bold") is not None: runs.append({"type": "bold", "value": m.group("bold")}) elif m.group("italic1") is not None: runs.append({"type": "italic", "value": m.group("italic1")}) elif m.group("italic2") is not None: runs.append({"type": "italic", "value": m.group("italic2")}) lastEnd = m.end() # Trailing plain text if lastEnd < len(text): runs.append({"type": "text", "value": text[lastEnd:]}) return runs if runs else [{"type": "text", "value": text}] def markdownToDocumentJson(markdown: str, title: str, language: str = "de") -> Dict[str, Any]: """ Convert markdown content to the standard document JSON format with Inline-Run model. Sections use inlineRuns (list of run dicts) instead of plain text strings. Supports headings, code blocks, tables, lists, images, paragraphs. """ if not isinstance(markdown, str): markdown = str(markdown) if markdown else "" sections = [] order = 0 lines = markdown.split("\n") i = 0 def _nextId(): nonlocal order order += 1 return f"s_{order}" while i < len(lines): line = lines[i] # Headings (plain text, no inline formatting) headingMatch = re.match(r"^(#{1,6})\s+(.+)", line) if headingMatch: level = len(headingMatch.group(1)) text = headingMatch.group(2).strip() sections.append({ "id": _nextId(), "content_type": "heading", "order": order, "elements": [{"content": {"text": text, "level": level}}], }) i += 1 continue # Fenced code blocks (no inline formatting) codeMatch = re.match(r"^```(\w*)", line) if codeMatch: lang = codeMatch.group(1) or "text" codeLines = [] i += 1 while i < len(lines) and not lines[i].startswith("```"): codeLines.append(lines[i]) i += 1 i += 1 sections.append({ "id": _nextId(), "content_type": "code_block", "order": order, "elements": [{"content": {"code": "\n".join(codeLines), "language": lang}}], }) continue # Tables - cells are List[InlineRun] tableMatch = re.match(r"^\|(.+)\|$", line) if tableMatch and (i + 1) < len(lines) and re.match(r"^\|[\s\-:|]+\|$", lines[i + 1]): headerCells = [_parseInlineRuns(c.strip()) for c in tableMatch.group(1).split("|")] i += 2 rows = [] while i < len(lines) and re.match(r"^\|(.+)\|$", lines[i]): rowCells = [_parseInlineRuns(c.strip()) for c in lines[i][1:-1].split("|")] rows.append(rowCells) i += 1 sections.append({ "id": _nextId(), "content_type": "table", "order": order, "elements": [{"content": {"headers": headerCells, "rows": rows}}], }) continue # Bullet / numbered lists - items are List[List[InlineRun]] listMatch = re.match(r"^(\s*)([-*+]|\d+[.)]) (.+)", line) if listMatch: isNumbered = bool(re.match(r"\d+[.)]", listMatch.group(2))) items = [] while i < len(lines) and re.match(r"^(\s*)([-*+]|\d+[.)]) (.+)", lines[i]): m = re.match(r"^(\s*)([-*+]|\d+[.)]) (.+)", lines[i]) items.append(_parseInlineRuns(m.group(3).strip())) i += 1 sections.append({ "id": _nextId(), "content_type": "bullet_list", "order": order, "elements": [{"content": {"items": items, "list_type": "numbered" if isNumbered else "bullet"}}], }) continue # Empty lines if not line.strip(): i += 1 continue # Standalone image on its own line -> block-level image section imgMatch = re.match(r"^!\[([^\]]*)\]\(([^)\"]+)(?:\s+\"(\d+)pt\")?\)\s*$", line) if imgMatch: altText = imgMatch.group(1).strip() or "Image" src = imgMatch.group(2).strip() widthStr = imgMatch.group(3) fileId = src[5:] if src.startswith("file:") else "" content = { "altText": altText, "base64Data": "", "_fileRef": fileId, "_srcUrl": src if not fileId else "", } if widthStr: content["widthPt"] = int(widthStr) sections.append({ "id": _nextId(), "content_type": "image", "order": order, "elements": [{"content": content}], }) i += 1 continue # Paragraph - produces inlineRuns paraLines = [] while i < len(lines) and lines[i].strip() and not re.match( r"^(#{1,6}\s|```|\|.+\||!\[[^\]]*\]\([^)]+\)\s*$|(\s*)([-*+]|\d+[.)]) )", lines[i] ): paraLines.append(lines[i]) i += 1 if paraLines: combinedText = " ".join(paraLines) sections.append({ "id": _nextId(), "content_type": "paragraph", "order": order, "elements": [{"content": {"inlineRuns": _parseInlineRuns(combinedText)}}], }) continue i += 1 if not sections: fallbackText = markdown.strip() or "(empty)" sections.append({ "id": _nextId(), "content_type": "paragraph", "order": order, "elements": [{"content": {"inlineRuns": _parseInlineRuns(fallbackText)}}], }) return { "metadata": { "split_strategy": "single_document", "source_documents": [], "extraction_method": "file_create_rendering", "title": title, "language": language, }, "documents": [{ "id": "doc_1", "title": title, "sections": sections, }], } def getFileExtension(fileName: str) -> str: """Extract file extension from fileName (without dot, lowercased).""" if '.' in fileName: return fileName.rsplit('.', 1)[-1].lower() return '' def getMimeTypeFromExtension(extension: str) -> str: """ Get MIME type based on file extension. This method consolidates MIME type detection from extension. Args: extension: File extension (with or without dot) Returns: str: MIME type for the extension """ # Normalize extension (remove dot if present) if extension.startswith('.'): extension = extension[1:] # Map extensions to MIME types mime_types = { 'txt': 'text/plain', 'json': 'application/json', 'xml': 'application/xml', 'csv': 'text/csv', 'html': 'text/html', 'htm': 'text/html', 'md': 'text/markdown', 'py': 'text/x-python', 'js': 'application/javascript', 'css': 'text/css', 'pdf': 'application/pdf', 'doc': 'application/msword', 'docx': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document', 'xls': 'application/vnd.ms-excel', 'xlsx': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', 'ppt': 'application/vnd.ms-powerpoint', 'pptx': 'application/vnd.openxmlformats-officedocument.presentationml.presentation', 'svg': 'image/svg+xml', 'jpg': 'image/jpeg', 'jpeg': 'image/jpeg', 'png': 'image/png', 'gif': 'image/gif', 'bmp': 'image/bmp', 'webp': 'image/webp', 'zip': 'application/zip', 'rar': 'application/x-rar-compressed', '7z': 'application/x-7z-compressed', 'tar': 'application/x-tar', 'gz': 'application/gzip' } return mime_types.get(extension.lower(), 'application/octet-stream') def detectContentTypeFromData(fileData: bytes, fileName: str) -> str: """ Detect content type from file data and fileName. This method makes the MIME type detection function accessible through the service center. Args: fileData: Raw file data as bytes fileName: Name of the file Returns: str: Detected MIME type """ try: # Check file extension first ext = os.path.splitext(fileName)[1].lower() if ext: # Map common extensions to MIME types extToMime = { '.txt': 'text/plain', '.md': 'text/markdown', '.csv': 'text/csv', '.json': 'application/json', '.xml': 'application/xml', '.js': 'application/javascript', '.py': 'application/x-python', '.svg': 'image/svg+xml', '.jpg': 'image/jpeg', '.jpeg': 'image/jpeg', '.png': 'image/png', '.gif': 'image/gif', '.bmp': 'image/bmp', '.webp': 'image/webp', '.pdf': 'application/pdf', '.docx': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document', '.doc': 'application/msword', '.xlsx': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', '.xls': 'application/vnd.ms-excel', '.pptx': 'application/vnd.openxmlformats-officedocument.presentationml.presentation', '.ppt': 'application/vnd.ms-powerpoint', '.html': 'text/html', '.htm': 'text/html', '.css': 'text/css', '.zip': 'application/zip', '.rar': 'application/x-rar-compressed', '.7z': 'application/x-7z-compressed', '.tar': 'application/x-tar', '.gz': 'application/gzip' } if ext in extToMime: return extToMime[ext] # Try to detect from content if fileData.startswith(b'%PDF'): return 'application/pdf' elif fileData.startswith(b'PK\x03\x04'): # ZIP-based formats (docx, xlsx, pptx) return 'application/zip' elif fileData.startswith(b'<'): # XML-based formats try: text = fileData.decode('utf-8', errors='ignore') if ' str: """Detect MIME type from file bytes and fileName using a service if provided.""" try: if service and hasattr(service, 'detectContentTypeFromData'): detected = service.detectContentTypeFromData(file_bytes, fileName) if detected and detected != 'application/octet-stream': return detected # Fallback: use our consolidated function return detectContentTypeFromData(file_bytes, fileName) except Exception as e: logger.warning(f"Error in MIME type detection for {fileName}: {str(e)}") return 'application/octet-stream' def detectMimeTypeFromContent(content: Any, fileName: str, service=None) -> str: """Detect MIME type from content and fileName using a service if provided.""" try: if isinstance(content, str): file_bytes = content.encode('utf-8') elif isinstance(content, dict): file_bytes = json.dumps(content, ensure_ascii=False).encode('utf-8') else: file_bytes = str(content).encode('utf-8') return detectMimeTypeFromData(file_bytes, fileName, service) except Exception as e: logger.warning(f"Error in MIME type detection for {fileName}: {str(e)}") return 'application/octet-stream' def convertDocumentDataToString(document_data: Any, file_extension: str) -> str: """Convert document data to string content based on file type with enhanced processing.""" try: if document_data is None: return "" if isinstance(document_data, bytes): # WICHTIG: Decode bytes to string for text files (HTML, text, etc.) try: return document_data.decode('utf-8') except UnicodeDecodeError: # Fallback: try latin1 or return with error replacement try: return document_data.decode('latin1') except Exception: return document_data.decode('utf-8', errors='replace') if isinstance(document_data, str): return document_data if isinstance(document_data, dict): if file_extension == 'json': return json.dumps(document_data, indent=2, ensure_ascii=False) elif file_extension in ['txt', 'md', 'html', 'css', 'js', 'py']: text_fields = ['content', 'text', 'data', 'result', 'summary', 'extracted_content', 'table_data'] for field in text_fields: if field in document_data: content = document_data[field] if isinstance(content, str): return content elif isinstance(content, (dict, list)): return json.dumps(content, indent=2, ensure_ascii=False) return json.dumps(document_data, indent=2, ensure_ascii=False) elif file_extension == 'csv': csv_fields = ['table_data', 'csv_data', 'rows', 'data', 'content', 'text'] for field in csv_fields: if field in document_data: content = document_data[field] if isinstance(content, str): return content elif isinstance(content, list): if content and isinstance(content[0], (list, dict)): import csv import io output = io.StringIO() if isinstance(content[0], dict): if content: fieldnames = content[0].keys() writer = csv.DictWriter(output, fieldnames=fieldnames) writer.writeheader() writer.writerows(content) else: writer = csv.writer(output) writer.writerows(content) return output.getvalue() return json.dumps(document_data, indent=2, ensure_ascii=False) else: return json.dumps(document_data, indent=2, ensure_ascii=False) elif isinstance(document_data, list): if file_extension == 'csv': import csv import io output = io.StringIO() if document_data and isinstance(document_data[0], dict): fieldnames = document_data[0].keys() writer = csv.DictWriter(output, fieldnames=fieldnames) writer.writeheader() writer.writerows(document_data) else: writer = csv.writer(output) writer.writerows(document_data) return output.getvalue() else: return json.dumps(document_data, indent=2, ensure_ascii=False) else: return str(document_data) except Exception as e: logger.error(f"Error converting document data to string: {str(e)}") return str(document_data)