# Copyright (c) 2025 Patrick Motsch # All rights reserved. import json import logging import os import re from typing import Any, Dict logger = logging.getLogger(__name__) def markdownToDocumentJson(markdown: str, title: str, language: str = "de") -> Dict[str, Any]: """ Convert markdown content to the standard document JSON format expected by renderReport. Supports headings, code blocks, tables, lists, images (file: refs), paragraphs. For plain text: wraps entire content in a single paragraph section. """ if not isinstance(markdown, str): markdown = str(markdown) if markdown else "" sections = [] order = 0 lines = markdown.split("\n") i = 0 def _nextId(): nonlocal order order += 1 return f"s_{order}" while i < len(lines): line = lines[i] # Headings headingMatch = re.match(r"^(#{1,6})\s+(.+)", line) if headingMatch: level = len(headingMatch.group(1)) text = headingMatch.group(2).strip() sections.append({ "id": _nextId(), "content_type": "heading", "order": order, "elements": [{"content": {"text": text, "level": level}}], }) i += 1 continue # Fenced code blocks codeMatch = re.match(r"^```(\w*)", line) if codeMatch: lang = codeMatch.group(1) or "text" codeLines = [] i += 1 while i < len(lines) and not lines[i].startswith("```"): codeLines.append(lines[i]) i += 1 i += 1 sections.append({ "id": _nextId(), "content_type": "code_block", "order": order, "elements": [{"content": {"code": "\n".join(codeLines), "language": lang}}], }) continue # Tables tableMatch = re.match(r"^\|(.+)\|$", line) if tableMatch and (i + 1) < len(lines) and re.match(r"^\|[\s\-:|]+\|$", lines[i + 1]): headerCells = [c.strip() for c in tableMatch.group(1).split("|")] i += 2 rows = [] while i < len(lines) and re.match(r"^\|(.+)\|$", lines[i]): rowCells = [c.strip() for c in lines[i][1:-1].split("|")] rows.append(rowCells) i += 1 sections.append({ "id": _nextId(), "content_type": "table", "order": order, "elements": [{"content": {"headers": headerCells, "rows": rows}}], }) continue # Bullet / numbered lists listMatch = re.match(r"^(\s*)([-*+]|\d+[.)]) (.+)", line) if listMatch: isNumbered = bool(re.match(r"\d+[.)]", listMatch.group(2))) items = [] while i < len(lines) and re.match(r"^(\s*)([-*+]|\d+[.)]) (.+)", lines[i]): m = re.match(r"^(\s*)([-*+]|\d+[.)]) (.+)", lines[i]) items.append({"text": m.group(3).strip()}) i += 1 sections.append({ "id": _nextId(), "content_type": "bullet_list", "order": order, "elements": [{"content": {"items": items, "list_type": "numbered" if isNumbered else "bullet"}}], }) continue # Empty lines if not line.strip(): i += 1 continue # Images (simplified: store as paragraph with ref for now - full resolution needs Knowledge Store) imgMatch = re.match(r"^!\[([^\]]*)\]\(([^)]+)\)", line) if imgMatch: altText = imgMatch.group(1).strip() or "Image" src = imgMatch.group(2).strip() fileId = src[5:] if src.startswith("file:") else "" sections.append({ "id": _nextId(), "content_type": "image", "order": order, "elements": [{ "content": { "altText": altText, "base64Data": "", "_fileRef": fileId, "_srcUrl": src if not fileId else "", } }], }) i += 1 continue # Paragraph paraLines = [] while i < len(lines) and lines[i].strip() and not re.match( r"^(#{1,6}\s|```|\|.+\||!\[|(\s*)([-*+]|\d+[.)]) )", lines[i] ): paraLines.append(lines[i]) i += 1 if paraLines: sections.append({ "id": _nextId(), "content_type": "paragraph", "order": order, "elements": [{"content": {"text": " ".join(paraLines)}}], }) continue i += 1 if not sections: sections.append({ "id": _nextId(), "content_type": "paragraph", "order": order, "elements": [{"content": {"text": markdown.strip() or "(empty)"}}], }) return { "metadata": { "split_strategy": "single_document", "source_documents": [], "extraction_method": "file_create_rendering", "title": title, "language": language, }, "documents": [{ "id": "doc_1", "title": title, "sections": sections, }], } def getFileExtension(fileName: str) -> str: """Extract file extension from fileName (without dot, lowercased).""" if '.' in fileName: return fileName.rsplit('.', 1)[-1].lower() return '' def getMimeTypeFromExtension(extension: str) -> str: """ Get MIME type based on file extension. This method consolidates MIME type detection from extension. Args: extension: File extension (with or without dot) Returns: str: MIME type for the extension """ # Normalize extension (remove dot if present) if extension.startswith('.'): extension = extension[1:] # Map extensions to MIME types mime_types = { 'txt': 'text/plain', 'json': 'application/json', 'xml': 'application/xml', 'csv': 'text/csv', 'html': 'text/html', 'htm': 'text/html', 'md': 'text/markdown', 'py': 'text/x-python', 'js': 'application/javascript', 'css': 'text/css', 'pdf': 'application/pdf', 'doc': 'application/msword', 'docx': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document', 'xls': 'application/vnd.ms-excel', 'xlsx': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', 'ppt': 'application/vnd.ms-powerpoint', 'pptx': 'application/vnd.openxmlformats-officedocument.presentationml.presentation', 'svg': 'image/svg+xml', 'jpg': 'image/jpeg', 'jpeg': 'image/jpeg', 'png': 'image/png', 'gif': 'image/gif', 'bmp': 'image/bmp', 'webp': 'image/webp', 'zip': 'application/zip', 'rar': 'application/x-rar-compressed', '7z': 'application/x-7z-compressed', 'tar': 'application/x-tar', 'gz': 'application/gzip' } return mime_types.get(extension.lower(), 'application/octet-stream') def detectContentTypeFromData(fileData: bytes, fileName: str) -> str: """ Detect content type from file data and fileName. This method makes the MIME type detection function accessible through the service center. Args: fileData: Raw file data as bytes fileName: Name of the file Returns: str: Detected MIME type """ try: # Check file extension first ext = os.path.splitext(fileName)[1].lower() if ext: # Map common extensions to MIME types extToMime = { '.txt': 'text/plain', '.md': 'text/markdown', '.csv': 'text/csv', '.json': 'application/json', '.xml': 'application/xml', '.js': 'application/javascript', '.py': 'application/x-python', '.svg': 'image/svg+xml', '.jpg': 'image/jpeg', '.jpeg': 'image/jpeg', '.png': 'image/png', '.gif': 'image/gif', '.bmp': 'image/bmp', '.webp': 'image/webp', '.pdf': 'application/pdf', '.docx': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document', '.doc': 'application/msword', '.xlsx': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', '.xls': 'application/vnd.ms-excel', '.pptx': 'application/vnd.openxmlformats-officedocument.presentationml.presentation', '.ppt': 'application/vnd.ms-powerpoint', '.html': 'text/html', '.htm': 'text/html', '.css': 'text/css', '.zip': 'application/zip', '.rar': 'application/x-rar-compressed', '.7z': 'application/x-7z-compressed', '.tar': 'application/x-tar', '.gz': 'application/gzip' } if ext in extToMime: return extToMime[ext] # Try to detect from content if fileData.startswith(b'%PDF'): return 'application/pdf' elif fileData.startswith(b'PK\x03\x04'): # ZIP-based formats (docx, xlsx, pptx) return 'application/zip' elif fileData.startswith(b'<'): # XML-based formats try: text = fileData.decode('utf-8', errors='ignore') if ' str: """Detect MIME type from file bytes and fileName using a service if provided.""" try: if service and hasattr(service, 'detectContentTypeFromData'): detected = service.detectContentTypeFromData(file_bytes, fileName) if detected and detected != 'application/octet-stream': return detected # Fallback: use our consolidated function return detectContentTypeFromData(file_bytes, fileName) except Exception as e: logger.warning(f"Error in MIME type detection for {fileName}: {str(e)}") return 'application/octet-stream' def detectMimeTypeFromContent(content: Any, fileName: str, service=None) -> str: """Detect MIME type from content and fileName using a service if provided.""" try: if isinstance(content, str): file_bytes = content.encode('utf-8') elif isinstance(content, dict): file_bytes = json.dumps(content, ensure_ascii=False).encode('utf-8') else: file_bytes = str(content).encode('utf-8') return detectMimeTypeFromData(file_bytes, fileName, service) except Exception as e: logger.warning(f"Error in MIME type detection for {fileName}: {str(e)}") return 'application/octet-stream' def convertDocumentDataToString(document_data: Any, file_extension: str) -> str: """Convert document data to string content based on file type with enhanced processing.""" try: if document_data is None: return "" if isinstance(document_data, bytes): # WICHTIG: Decode bytes to string for text files (HTML, text, etc.) try: return document_data.decode('utf-8') except UnicodeDecodeError: # Fallback: try latin1 or return with error replacement try: return document_data.decode('latin1') except Exception: return document_data.decode('utf-8', errors='replace') if isinstance(document_data, str): return document_data if isinstance(document_data, dict): if file_extension == 'json': return json.dumps(document_data, indent=2, ensure_ascii=False) elif file_extension in ['txt', 'md', 'html', 'css', 'js', 'py']: text_fields = ['content', 'text', 'data', 'result', 'summary', 'extracted_content', 'table_data'] for field in text_fields: if field in document_data: content = document_data[field] if isinstance(content, str): return content elif isinstance(content, (dict, list)): return json.dumps(content, indent=2, ensure_ascii=False) return json.dumps(document_data, indent=2, ensure_ascii=False) elif file_extension == 'csv': csv_fields = ['table_data', 'csv_data', 'rows', 'data', 'content', 'text'] for field in csv_fields: if field in document_data: content = document_data[field] if isinstance(content, str): return content elif isinstance(content, list): if content and isinstance(content[0], (list, dict)): import csv import io output = io.StringIO() if isinstance(content[0], dict): if content: fieldnames = content[0].keys() writer = csv.DictWriter(output, fieldnames=fieldnames) writer.writeheader() writer.writerows(content) else: writer = csv.writer(output) writer.writerows(content) return output.getvalue() return json.dumps(document_data, indent=2, ensure_ascii=False) else: return json.dumps(document_data, indent=2, ensure_ascii=False) elif isinstance(document_data, list): if file_extension == 'csv': import csv import io output = io.StringIO() if document_data and isinstance(document_data[0], dict): fieldnames = document_data[0].keys() writer = csv.DictWriter(output, fieldnames=fieldnames) writer.writeheader() writer.writerows(document_data) else: writer = csv.writer(output) writer.writerows(document_data) return output.getvalue() else: return json.dumps(document_data, indent=2, ensure_ascii=False) else: return str(document_data) except Exception as e: logger.error(f"Error converting document data to string: {str(e)}") return str(document_data)