496 lines
No EOL
21 KiB
Python
496 lines
No EOL
21 KiB
Python
# Copyright (c) 2025 Patrick Motsch
|
|
# All rights reserved.
|
|
"""
|
|
Base renderer class for all format renderers.
|
|
"""
|
|
|
|
from abc import ABC, abstractmethod
|
|
from typing import Dict, Any, Tuple, List
|
|
from modules.datamodels.datamodelJson import supportedSectionTypes
|
|
import json
|
|
import logging
|
|
import re
|
|
from datetime import datetime, UTC
|
|
import base64
|
|
import io
|
|
from PIL import Image
|
|
from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, OperationTypeEnum
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class BaseRenderer(ABC):
|
|
"""Base class for all format renderers."""
|
|
|
|
def __init__(self, services=None):
|
|
self.logger = logger
|
|
self.services = services # Add services attribute
|
|
|
|
@classmethod
|
|
def getSupportedFormats(cls) -> List[str]:
|
|
"""
|
|
Return list of supported format names for this renderer.
|
|
Override this method in subclasses to specify supported formats.
|
|
"""
|
|
return []
|
|
|
|
@classmethod
|
|
def getFormatAliases(cls) -> List[str]:
|
|
"""
|
|
Return list of format aliases for this renderer.
|
|
Override this method in subclasses to specify format aliases.
|
|
"""
|
|
return []
|
|
|
|
@classmethod
|
|
def getPriority(cls) -> int:
|
|
"""
|
|
Return priority for this renderer (higher number = higher priority).
|
|
Used when multiple renderers support the same format.
|
|
"""
|
|
return 0
|
|
|
|
@abstractmethod
|
|
async def render(self, extractedContent: Dict[str, Any], title: str, userPrompt: str = None, aiService=None) -> Tuple[str, str]:
|
|
"""
|
|
Render extracted JSON content to the target format.
|
|
|
|
Args:
|
|
extractedContent: Structured JSON content with sections and metadata
|
|
title: Report title
|
|
userPrompt: Original user prompt for context
|
|
aiService: AI service instance for additional processing
|
|
|
|
Returns:
|
|
tuple: (renderedContent, mimeType)
|
|
"""
|
|
pass
|
|
|
|
def _extractSections(self, reportData: Dict[str, Any]) -> List[Dict[str, Any]]:
|
|
"""Extract sections from report data."""
|
|
return reportData.get('sections', [])
|
|
|
|
def _extractMetadata(self, reportData: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Extract metadata from report data."""
|
|
return reportData.get('metadata', {})
|
|
|
|
def _getTitle(self, reportData: Dict[str, Any], fallbackTitle: str) -> str:
|
|
"""Get title from report data or use fallback."""
|
|
metadata = reportData.get('metadata', {})
|
|
return metadata.get('title', fallbackTitle)
|
|
|
|
def _validateJsonStructure(self, jsonContent: Dict[str, Any]) -> bool:
|
|
"""Validate that JSON content has the expected structure."""
|
|
if not isinstance(jsonContent, dict):
|
|
return False
|
|
|
|
if "sections" not in jsonContent:
|
|
return False
|
|
|
|
sections = jsonContent.get("sections", [])
|
|
if not isinstance(sections, list):
|
|
return False
|
|
|
|
# Validate each section has content_type and elements
|
|
for section in sections:
|
|
if not isinstance(section, dict):
|
|
return False
|
|
if "content_type" not in section or "elements" not in section:
|
|
return False
|
|
|
|
return True
|
|
|
|
def _getSectionType(self, section: Dict[str, Any]) -> str:
|
|
"""Get the type of a section; default to 'paragraph' for non-dict inputs."""
|
|
if isinstance(section, dict):
|
|
return section.get("content_type", "paragraph")
|
|
# If section is a list or any other type, treat as paragraph elements
|
|
return "paragraph"
|
|
|
|
def _getSectionData(self, section: Dict[str, Any]) -> List[Dict[str, Any]]:
|
|
"""Get the elements of a section; if a list is provided directly, return it."""
|
|
if isinstance(section, dict):
|
|
return section.get("elements", [])
|
|
if isinstance(section, list):
|
|
return section
|
|
return []
|
|
|
|
def _getSectionId(self, section: Dict[str, Any]) -> str:
|
|
"""Get the ID of a section (if available)."""
|
|
if isinstance(section, dict):
|
|
return section.get("id", "unknown")
|
|
return "unknown"
|
|
|
|
def _extractTableData(self, sectionData: Dict[str, Any]) -> Tuple[List[str], List[List[str]]]:
|
|
"""Extract table headers and rows from section data."""
|
|
# Normalize when elements array was passed in
|
|
if isinstance(sectionData, list) and sectionData:
|
|
candidate = sectionData[0]
|
|
sectionData = candidate if isinstance(candidate, dict) else {}
|
|
headers = sectionData.get("headers", [])
|
|
rows = sectionData.get("rows", [])
|
|
return headers, rows
|
|
|
|
def _extractBulletListItems(self, sectionData: Dict[str, Any]) -> List[str]:
|
|
"""Extract bullet list items from section data."""
|
|
# Normalize when elements array or raw list was passed in
|
|
if isinstance(sectionData, list):
|
|
# Already a list of items (strings or dicts)
|
|
items = sectionData
|
|
else:
|
|
items = sectionData.get("items", [])
|
|
result = []
|
|
for item in items:
|
|
if isinstance(item, str):
|
|
result.append(item)
|
|
elif isinstance(item, dict) and "text" in item:
|
|
result.append(item["text"])
|
|
return result
|
|
|
|
def _extractHeadingData(self, sectionData: Dict[str, Any]) -> Tuple[int, str]:
|
|
"""Extract heading level and text from section data."""
|
|
# Normalize when elements array was passed in
|
|
if isinstance(sectionData, list) and sectionData:
|
|
sectionData = sectionData[0] if isinstance(sectionData[0], dict) else {}
|
|
level = sectionData.get("level", 1)
|
|
text = sectionData.get("text", "")
|
|
return level, text
|
|
|
|
def _extractParagraphText(self, sectionData: Dict[str, Any]) -> str:
|
|
"""Extract paragraph text from section data."""
|
|
if isinstance(sectionData, list):
|
|
# Join multiple paragraph elements if provided as a list
|
|
texts = []
|
|
for el in sectionData:
|
|
if isinstance(el, dict) and "text" in el:
|
|
texts.append(el["text"])
|
|
elif isinstance(el, str):
|
|
texts.append(el)
|
|
return "\n".join(texts)
|
|
return sectionData.get("text", "")
|
|
|
|
def _extractCodeBlockData(self, sectionData: Dict[str, Any]) -> Tuple[str, str]:
|
|
"""Extract code and language from section data."""
|
|
# Normalize when elements array was passed in
|
|
if isinstance(sectionData, list) and sectionData:
|
|
sectionData = sectionData[0] if isinstance(sectionData[0], dict) else {}
|
|
code = sectionData.get("code", "")
|
|
language = sectionData.get("language", "")
|
|
return code, language
|
|
|
|
def _extractImageData(self, sectionData: Dict[str, Any]) -> Tuple[str, str]:
|
|
"""Extract base64 data and alt text from section data."""
|
|
# Normalize when elements array was passed in
|
|
if isinstance(sectionData, list) and sectionData:
|
|
sectionData = sectionData[0] if isinstance(sectionData[0], dict) else {}
|
|
base64Data = sectionData.get("base64Data", "")
|
|
altText = sectionData.get("altText", "Image")
|
|
return base64Data, altText
|
|
|
|
def _renderImageSection(self, section: Dict[str, Any], styles: Dict[str, Any] = None) -> Any:
|
|
"""
|
|
Render an image section. This is a base implementation that should be overridden
|
|
by format-specific renderers.
|
|
|
|
Args:
|
|
section: Image section data
|
|
styles: Optional styling information
|
|
|
|
Returns:
|
|
Format-specific image representation
|
|
"""
|
|
sectionData = self._getSectionData(section)
|
|
base64Data, altText = self._extractImageData(sectionData)
|
|
|
|
# Base implementation returns a simple dict
|
|
# Format-specific renderers should override this method
|
|
return {
|
|
"content_type": "image",
|
|
"base64Data": base64Data,
|
|
"altText": altText,
|
|
"width": sectionData.get("width", None),
|
|
"height": sectionData.get("height", None),
|
|
"caption": sectionData.get("caption", "")
|
|
}
|
|
|
|
def _validateImageData(self, base64Data: str, altText: str) -> bool:
|
|
"""Validate image data."""
|
|
if not base64Data:
|
|
self.logger.warning("Image section has no base64 data")
|
|
return False
|
|
|
|
if not altText:
|
|
self.logger.warning("Image section has no alt text")
|
|
return False
|
|
|
|
# Basic base64 validation
|
|
try:
|
|
base64.b64decode(base64Data, validate=True)
|
|
return True
|
|
except Exception as e:
|
|
self.logger.warning(f"Invalid base64 image data: {str(e)}")
|
|
return False
|
|
|
|
def _getImageDimensions(self, base64Data: str) -> Tuple[int, int]:
|
|
"""
|
|
Get image dimensions from base64 data.
|
|
This is a helper method that format-specific renderers can use.
|
|
"""
|
|
try:
|
|
# Decode base64 data
|
|
imageData = base64.b64decode(base64Data)
|
|
image = Image.open(io.BytesIO(imageData))
|
|
|
|
return image.size # Returns (width, height)
|
|
|
|
except Exception as e:
|
|
self.logger.warning(f"Could not determine image dimensions: {str(e)}")
|
|
return (0, 0)
|
|
|
|
def _resizeImageIfNeeded(self, base64Data: str, maxWidth: int = 800, maxHeight: int = 600) -> str:
|
|
"""
|
|
Resize image if it exceeds maximum dimensions.
|
|
Returns the resized image as base64 string.
|
|
"""
|
|
try:
|
|
# Decode base64 data
|
|
imageData = base64.b64decode(base64Data)
|
|
image = Image.open(io.BytesIO(imageData))
|
|
|
|
# Check if resizing is needed
|
|
width, height = image.size
|
|
if width <= maxWidth and height <= maxHeight:
|
|
return base64Data # No resizing needed
|
|
|
|
# Calculate new dimensions maintaining aspect ratio
|
|
ratio = min(maxWidth / width, maxHeight / height)
|
|
newWidth = int(width * ratio)
|
|
newHeight = int(height * ratio)
|
|
|
|
# Resize image
|
|
resizedImage = image.resize((newWidth, newHeight), Image.Resampling.LANCZOS)
|
|
|
|
# Convert back to base64
|
|
buffer = io.BytesIO()
|
|
resizedImage.save(buffer, format=image.format or 'PNG')
|
|
resizedData = buffer.getvalue()
|
|
|
|
return base64.b64encode(resizedData).decode('utf-8')
|
|
|
|
except Exception as e:
|
|
self.logger.warning(f"Could not resize image: {str(e)}")
|
|
return base64Data # Return original if resize fails
|
|
|
|
def _getSupportedSectionTypes(self) -> List[str]:
|
|
"""Return list of supported section types (from unified schema)."""
|
|
return supportedSectionTypes
|
|
|
|
def _isValidSectionType(self, sectionType: str) -> bool:
|
|
"""Check if a section type is valid."""
|
|
return sectionType in self._getSupportedSectionTypes()
|
|
|
|
def _processSectionByType(self, section: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Process a section and return structured data based on its type."""
|
|
sectionType = self._getSectionType(section)
|
|
sectionData = self._getSectionData(section)
|
|
|
|
if sectionType == "table":
|
|
headers, rows = self._extractTableData(sectionData)
|
|
return {"content_type": "table", "headers": headers, "rows": rows}
|
|
elif sectionType == "bullet_list":
|
|
items = self._extractBulletListItems(sectionData)
|
|
return {"content_type": "bullet_list", "items": items}
|
|
elif sectionType == "heading":
|
|
level, text = self._extractHeadingData(sectionData)
|
|
return {"content_type": "heading", "level": level, "text": text}
|
|
elif sectionType == "paragraph":
|
|
text = self._extractParagraphText(sectionData)
|
|
return {"content_type": "paragraph", "text": text}
|
|
elif sectionType == "code_block":
|
|
code, language = self._extractCodeBlockData(sectionData)
|
|
return {"content_type": "code_block", "code": code, "language": language}
|
|
elif sectionType == "image":
|
|
base64Data, altText = self._extractImageData(sectionData)
|
|
# Validate image data
|
|
if self._validateImageData(base64Data, altText):
|
|
return {
|
|
"content_type": "image",
|
|
"base64Data": base64Data,
|
|
"altText": altText,
|
|
"width": sectionData.get("width") if isinstance(sectionData, dict) else None,
|
|
"height": sectionData.get("height") if isinstance(sectionData, dict) else None,
|
|
"caption": sectionData.get("caption", "") if isinstance(sectionData, dict) else ""
|
|
}
|
|
else:
|
|
# Return placeholder if image data is invalid
|
|
return {"content_type": "paragraph", "text": f"[Image: {altText}]"}
|
|
else:
|
|
# Fallback to paragraph
|
|
text = self._extractParagraphText(sectionData)
|
|
return {"content_type": "paragraph", "text": text}
|
|
|
|
def _formatTimestamp(self, timestamp: str = None) -> str:
|
|
"""Format timestamp for display."""
|
|
if timestamp:
|
|
return timestamp
|
|
return datetime.now(UTC).strftime("%Y-%m-%d %H:%M:%S UTC")
|
|
|
|
# ===== GENERIC AI STYLING HELPERS =====
|
|
|
|
async def _getAiStyles(self, aiService, styleTemplate: str, defaultStyles: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""
|
|
Generic AI styling method that can be used by all renderers.
|
|
|
|
Args:
|
|
aiService: AI service instance
|
|
styleTemplate: Format-specific style template
|
|
defaultStyles: Default styles to fall back to
|
|
|
|
Returns:
|
|
Dict with styling definitions
|
|
"""
|
|
# DEBUG: Show which renderer is calling this method
|
|
|
|
if not aiService:
|
|
return defaultStyles
|
|
|
|
try:
|
|
|
|
requestOptions = AiCallOptions()
|
|
requestOptions.operationType = OperationTypeEnum.DATA_GENERATE
|
|
|
|
request = AiCallRequest(prompt=styleTemplate, context="", options=requestOptions)
|
|
|
|
# DEBUG: Show the actual prompt being sent to AI
|
|
self.logger.debug(f"AI Style Template Prompt:")
|
|
self.logger.debug(f"{styleTemplate}")
|
|
|
|
response = await aiService.callAi(request)
|
|
|
|
# Save styling prompt and response to debug
|
|
self.services.utils.writeDebugFile(styleTemplate, "renderer_styling_prompt")
|
|
self.services.utils.writeDebugFile(response.content or '', "renderer_styling_response")
|
|
|
|
# Clean and parse JSON
|
|
result = response.content.strip() if response and response.content else ""
|
|
|
|
# Check if result is empty
|
|
if not result:
|
|
self.logger.warning("AI styling returned empty response, using defaults")
|
|
return defaultStyles
|
|
|
|
# Extract JSON from markdown if present
|
|
jsonMatch = re.search(r'```json\s*\n(.*?)\n```', result, re.DOTALL)
|
|
if jsonMatch:
|
|
result = jsonMatch.group(1).strip()
|
|
elif result.startswith('```json'):
|
|
result = re.sub(r'^```json\s*', '', result)
|
|
result = re.sub(r'\s*```$', '', result)
|
|
elif result.startswith('```'):
|
|
result = re.sub(r'^```\s*', '', result)
|
|
result = re.sub(r'\s*```$', '', result)
|
|
|
|
# Try to parse JSON
|
|
try:
|
|
styles = json.loads(result)
|
|
except json.JSONDecodeError as jsonError:
|
|
self.logger.warning(f"AI styling returned invalid JSON: {jsonError}")
|
|
|
|
# Use print instead of logger to avoid truncation
|
|
self.services.utils.debugLogToFile(f"FULL AI RESPONSE THAT FAILED TO PARSE: {result}", "RENDERER")
|
|
self.services.utils.debugLogToFile(f"RESPONSE LENGTH: {len(result)} characters", "RENDERER")
|
|
|
|
self.logger.warning(f"Raw content that failed to parse: {result}")
|
|
|
|
# Try to fix incomplete JSON by adding missing closing braces
|
|
openBraces = result.count('{')
|
|
closeBraces = result.count('}')
|
|
|
|
if openBraces > closeBraces:
|
|
# JSON is incomplete, add missing closing braces
|
|
missingBraces = openBraces - closeBraces
|
|
result = result + '}' * missingBraces
|
|
self.logger.info(f"Added {missingBraces} missing closing brace(s)")
|
|
self.logger.debug(f"Fixed JSON: {result}")
|
|
|
|
# Try parsing the fixed JSON
|
|
try:
|
|
styles = json.loads(result)
|
|
self.logger.info("Successfully fixed incomplete JSON")
|
|
except json.JSONDecodeError as fixError:
|
|
self.logger.warning(f"Fixed JSON still invalid: {fixError}")
|
|
self.logger.warning(f"Fixed JSON content: {result}")
|
|
# Try to extract just the JSON part if it's embedded in text
|
|
jsonStart = result.find('{')
|
|
jsonEnd = result.rfind('}')
|
|
if jsonStart != -1 and jsonEnd != -1 and jsonEnd > jsonStart:
|
|
jsonPart = result[jsonStart:jsonEnd+1]
|
|
try:
|
|
styles = json.loads(jsonPart)
|
|
self.logger.info("Successfully extracted JSON from explanatory text")
|
|
except json.JSONDecodeError:
|
|
self.logger.warning("Could not extract valid JSON from response, using defaults")
|
|
return defaultStyles
|
|
else:
|
|
return defaultStyles
|
|
else:
|
|
# Try to extract just the JSON part if it's embedded in text
|
|
jsonStart = result.find('{')
|
|
jsonEnd = result.rfind('}')
|
|
if jsonStart != -1 and jsonEnd != -1 and jsonEnd > jsonStart:
|
|
jsonPart = result[jsonStart:jsonEnd+1]
|
|
try:
|
|
styles = json.loads(jsonPart)
|
|
self.logger.info("Successfully extracted JSON from explanatory text")
|
|
except json.JSONDecodeError:
|
|
self.logger.warning("Could not extract valid JSON from response, using defaults")
|
|
return defaultStyles
|
|
else:
|
|
return defaultStyles
|
|
|
|
# Convert colors to appropriate format
|
|
styles = self._convertColorsFormat(styles)
|
|
|
|
return styles
|
|
|
|
except Exception as e:
|
|
self.logger.warning(f"AI styling failed: {str(e)}, using defaults")
|
|
return defaultStyles
|
|
|
|
def _convertColorsFormat(self, styles: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""
|
|
Convert colors to appropriate format based on renderer type.
|
|
Override this method in subclasses for format-specific color handling.
|
|
"""
|
|
return styles
|
|
|
|
def _createAiStyleTemplate(self, formatName: str, userPrompt: str, styleSchema: Dict[str, Any]) -> str:
|
|
"""
|
|
Create a standardized AI style template for any format.
|
|
|
|
Args:
|
|
formatName: Name of the format (e.g., "docx", "xlsx", "pptx")
|
|
userPrompt: User's original prompt
|
|
styleSchema: Format-specific style schema
|
|
|
|
Returns:
|
|
Formatted prompt string
|
|
"""
|
|
schemaJson = json.dumps(styleSchema, indent=4)
|
|
|
|
# DEBUG: Show the schema being sent
|
|
|
|
return f"""You are a professional document styling expert. Generate a complete JSON styling configuration for {formatName.upper()} documents.
|
|
|
|
User request: {userPrompt}
|
|
|
|
Use this schema as a template:
|
|
{schemaJson}
|
|
|
|
Requirements:
|
|
- Return ONLY the complete JSON object (no markdown, no explanations)
|
|
- If the user request contains style/formatting/design instructions (in any language), customize the styling accordingly (adapt styles and add styles if needed)
|
|
- If the user request has NO style instructions, return the default schema values unchanged
|
|
- Ensure all objects are properly closed with closing braces
|
|
- Only modify styles if style instructions are present in the user request
|
|
|
|
Return the complete JSON:""" |