gateway/modules/services/serviceGeneration/renderers/rendererBaseTemplate.py

492 lines
No EOL
21 KiB
Python

"""
Base renderer class for all format renderers.
"""
from abc import ABC, abstractmethod
from typing import Dict, Any, Tuple, List
from modules.datamodels.datamodelJson import supportedSectionTypes
import json
import logging
import re
from datetime import datetime, UTC
import base64
import io
from PIL import Image
from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, OperationTypeEnum
logger = logging.getLogger(__name__)
class BaseRenderer(ABC):
"""Base class for all format renderers."""
def __init__(self, services=None):
self.logger = logger
self.services = services # Add services attribute
@classmethod
def getSupportedFormats(cls) -> List[str]:
"""
Return list of supported format names for this renderer.
Override this method in subclasses to specify supported formats.
"""
return []
@classmethod
def getFormatAliases(cls) -> List[str]:
"""
Return list of format aliases for this renderer.
Override this method in subclasses to specify format aliases.
"""
return []
@classmethod
def getPriority(cls) -> int:
"""
Return priority for this renderer (higher number = higher priority).
Used when multiple renderers support the same format.
"""
return 0
@abstractmethod
async def render(self, extractedContent: Dict[str, Any], title: str, userPrompt: str = None, aiService=None) -> Tuple[str, str]:
"""
Render extracted JSON content to the target format.
Args:
extractedContent: Structured JSON content with sections and metadata
title: Report title
userPrompt: Original user prompt for context
aiService: AI service instance for additional processing
Returns:
tuple: (renderedContent, mimeType)
"""
pass
def _extractSections(self, reportData: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Extract sections from report data."""
return reportData.get('sections', [])
def _extractMetadata(self, reportData: Dict[str, Any]) -> Dict[str, Any]:
"""Extract metadata from report data."""
return reportData.get('metadata', {})
def _getTitle(self, reportData: Dict[str, Any], fallbackTitle: str) -> str:
"""Get title from report data or use fallback."""
metadata = reportData.get('metadata', {})
return metadata.get('title', fallbackTitle)
def _validateJsonStructure(self, jsonContent: Dict[str, Any]) -> bool:
"""Validate that JSON content has the expected structure."""
if not isinstance(jsonContent, dict):
return False
if "sections" not in jsonContent:
return False
sections = jsonContent.get("sections", [])
if not isinstance(sections, list):
return False
# Validate each section has content_type and elements
for section in sections:
if not isinstance(section, dict):
return False
if "content_type" not in section or "elements" not in section:
return False
return True
def _getSectionType(self, section: Dict[str, Any]) -> str:
"""Get the type of a section; default to 'paragraph' for non-dict inputs."""
if isinstance(section, dict):
return section.get("content_type", "paragraph")
# If section is a list or any other type, treat as paragraph elements
return "paragraph"
def _getSectionData(self, section: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Get the elements of a section; if a list is provided directly, return it."""
if isinstance(section, dict):
return section.get("elements", [])
if isinstance(section, list):
return section
return []
def _getSectionId(self, section: Dict[str, Any]) -> str:
"""Get the ID of a section (if available)."""
if isinstance(section, dict):
return section.get("id", "unknown")
return "unknown"
def _extractTableData(self, sectionData: Dict[str, Any]) -> Tuple[List[str], List[List[str]]]:
"""Extract table headers and rows from section data."""
# Normalize when elements array was passed in
if isinstance(sectionData, list) and sectionData:
candidate = sectionData[0]
sectionData = candidate if isinstance(candidate, dict) else {}
headers = sectionData.get("headers", [])
rows = sectionData.get("rows", [])
return headers, rows
def _extractBulletListItems(self, sectionData: Dict[str, Any]) -> List[str]:
"""Extract bullet list items from section data."""
# Normalize when elements array or raw list was passed in
if isinstance(sectionData, list):
# Already a list of items (strings or dicts)
items = sectionData
else:
items = sectionData.get("items", [])
result = []
for item in items:
if isinstance(item, str):
result.append(item)
elif isinstance(item, dict) and "text" in item:
result.append(item["text"])
return result
def _extractHeadingData(self, sectionData: Dict[str, Any]) -> Tuple[int, str]:
"""Extract heading level and text from section data."""
# Normalize when elements array was passed in
if isinstance(sectionData, list) and sectionData:
sectionData = sectionData[0] if isinstance(sectionData[0], dict) else {}
level = sectionData.get("level", 1)
text = sectionData.get("text", "")
return level, text
def _extractParagraphText(self, sectionData: Dict[str, Any]) -> str:
"""Extract paragraph text from section data."""
if isinstance(sectionData, list):
# Join multiple paragraph elements if provided as a list
texts = []
for el in sectionData:
if isinstance(el, dict) and "text" in el:
texts.append(el["text"])
elif isinstance(el, str):
texts.append(el)
return "\n".join(texts)
return sectionData.get("text", "")
def _extractCodeBlockData(self, sectionData: Dict[str, Any]) -> Tuple[str, str]:
"""Extract code and language from section data."""
# Normalize when elements array was passed in
if isinstance(sectionData, list) and sectionData:
sectionData = sectionData[0] if isinstance(sectionData[0], dict) else {}
code = sectionData.get("code", "")
language = sectionData.get("language", "")
return code, language
def _extractImageData(self, sectionData: Dict[str, Any]) -> Tuple[str, str]:
"""Extract base64 data and alt text from section data."""
# Normalize when elements array was passed in
if isinstance(sectionData, list) and sectionData:
sectionData = sectionData[0] if isinstance(sectionData[0], dict) else {}
base64Data = sectionData.get("base64Data", "")
altText = sectionData.get("altText", "Image")
return base64Data, altText
def _renderImageSection(self, section: Dict[str, Any], styles: Dict[str, Any] = None) -> Any:
"""
Render an image section. This is a base implementation that should be overridden
by format-specific renderers.
Args:
section: Image section data
styles: Optional styling information
Returns:
Format-specific image representation
"""
sectionData = self._getSectionData(section)
base64Data, altText = self._extractImageData(sectionData)
# Base implementation returns a simple dict
# Format-specific renderers should override this method
return {
"content_type": "image",
"base64Data": base64Data,
"altText": altText,
"width": sectionData.get("width", None),
"height": sectionData.get("height", None),
"caption": sectionData.get("caption", "")
}
def _validateImageData(self, base64Data: str, altText: str) -> bool:
"""Validate image data."""
if not base64Data:
self.logger.warning("Image section has no base64 data")
return False
if not altText:
self.logger.warning("Image section has no alt text")
return False
# Basic base64 validation
try:
base64.b64decode(base64Data, validate=True)
return True
except Exception as e:
self.logger.warning(f"Invalid base64 image data: {str(e)}")
return False
def _getImageDimensions(self, base64Data: str) -> Tuple[int, int]:
"""
Get image dimensions from base64 data.
This is a helper method that format-specific renderers can use.
"""
try:
# Decode base64 data
imageData = base64.b64decode(base64Data)
image = Image.open(io.BytesIO(imageData))
return image.size # Returns (width, height)
except Exception as e:
self.logger.warning(f"Could not determine image dimensions: {str(e)}")
return (0, 0)
def _resizeImageIfNeeded(self, base64Data: str, maxWidth: int = 800, maxHeight: int = 600) -> str:
"""
Resize image if it exceeds maximum dimensions.
Returns the resized image as base64 string.
"""
try:
# Decode base64 data
imageData = base64.b64decode(base64Data)
image = Image.open(io.BytesIO(imageData))
# Check if resizing is needed
width, height = image.size
if width <= maxWidth and height <= maxHeight:
return base64Data # No resizing needed
# Calculate new dimensions maintaining aspect ratio
ratio = min(maxWidth / width, maxHeight / height)
newWidth = int(width * ratio)
newHeight = int(height * ratio)
# Resize image
resizedImage = image.resize((newWidth, newHeight), Image.Resampling.LANCZOS)
# Convert back to base64
buffer = io.BytesIO()
resizedImage.save(buffer, format=image.format or 'PNG')
resizedData = buffer.getvalue()
return base64.b64encode(resizedData).decode('utf-8')
except Exception as e:
self.logger.warning(f"Could not resize image: {str(e)}")
return base64Data # Return original if resize fails
def _getSupportedSectionTypes(self) -> List[str]:
"""Return list of supported section types (from unified schema)."""
return supportedSectionTypes
def _isValidSectionType(self, sectionType: str) -> bool:
"""Check if a section type is valid."""
return sectionType in self._getSupportedSectionTypes()
def _processSectionByType(self, section: Dict[str, Any]) -> Dict[str, Any]:
"""Process a section and return structured data based on its type."""
sectionType = self._getSectionType(section)
sectionData = self._getSectionData(section)
if sectionType == "table":
headers, rows = self._extractTableData(sectionData)
return {"content_type": "table", "headers": headers, "rows": rows}
elif sectionType == "bullet_list":
items = self._extractBulletListItems(sectionData)
return {"content_type": "bullet_list", "items": items}
elif sectionType == "heading":
level, text = self._extractHeadingData(sectionData)
return {"content_type": "heading", "level": level, "text": text}
elif sectionType == "paragraph":
text = self._extractParagraphText(sectionData)
return {"content_type": "paragraph", "text": text}
elif sectionType == "code_block":
code, language = self._extractCodeBlockData(sectionData)
return {"content_type": "code_block", "code": code, "language": language}
elif sectionType == "image":
base64Data, altText = self._extractImageData(sectionData)
# Validate image data
if self._validateImageData(base64Data, altText):
return {
"content_type": "image",
"base64Data": base64Data,
"altText": altText,
"width": sectionData.get("width") if isinstance(sectionData, dict) else None,
"height": sectionData.get("height") if isinstance(sectionData, dict) else None,
"caption": sectionData.get("caption", "") if isinstance(sectionData, dict) else ""
}
else:
# Return placeholder if image data is invalid
return {"content_type": "paragraph", "text": f"[Image: {altText}]"}
else:
# Fallback to paragraph
text = self._extractParagraphText(sectionData)
return {"content_type": "paragraph", "text": text}
def _formatTimestamp(self, timestamp: str = None) -> str:
"""Format timestamp for display."""
if timestamp:
return timestamp
return datetime.now(UTC).strftime("%Y-%m-%d %H:%M:%S UTC")
# ===== GENERIC AI STYLING HELPERS =====
async def _getAiStyles(self, aiService, styleTemplate: str, defaultStyles: Dict[str, Any]) -> Dict[str, Any]:
"""
Generic AI styling method that can be used by all renderers.
Args:
aiService: AI service instance
styleTemplate: Format-specific style template
defaultStyles: Default styles to fall back to
Returns:
Dict with styling definitions
"""
# DEBUG: Show which renderer is calling this method
if not aiService:
return defaultStyles
try:
requestOptions = AiCallOptions()
requestOptions.operationType = OperationTypeEnum.DATA_GENERATE
request = AiCallRequest(prompt=styleTemplate, context="", options=requestOptions)
# DEBUG: Show the actual prompt being sent to AI
self.logger.debug(f"AI Style Template Prompt:")
self.logger.debug(f"{styleTemplate}")
response = await aiService.aiObjects.call(request)
# Save styling prompt and response to debug
self.services.utils.writeDebugFile(styleTemplate, "renderer_styling_prompt")
self.services.utils.writeDebugFile(response.content or '', "renderer_styling_response")
# Clean and parse JSON
result = response.content.strip() if response and response.content else ""
# Check if result is empty
if not result:
self.logger.warning("AI styling returned empty response, using defaults")
return defaultStyles
# Extract JSON from markdown if present
jsonMatch = re.search(r'```json\s*\n(.*?)\n```', result, re.DOTALL)
if jsonMatch:
result = jsonMatch.group(1).strip()
elif result.startswith('```json'):
result = re.sub(r'^```json\s*', '', result)
result = re.sub(r'\s*```$', '', result)
elif result.startswith('```'):
result = re.sub(r'^```\s*', '', result)
result = re.sub(r'\s*```$', '', result)
# Try to parse JSON
try:
styles = json.loads(result)
except json.JSONDecodeError as jsonError:
self.logger.warning(f"AI styling returned invalid JSON: {jsonError}")
# Use print instead of logger to avoid truncation
self.services.utils.debugLogToFile(f"FULL AI RESPONSE THAT FAILED TO PARSE: {result}", "RENDERER")
self.services.utils.debugLogToFile(f"RESPONSE LENGTH: {len(result)} characters", "RENDERER")
self.logger.warning(f"Raw content that failed to parse: {result}")
# Try to fix incomplete JSON by adding missing closing braces
openBraces = result.count('{')
closeBraces = result.count('}')
if openBraces > closeBraces:
# JSON is incomplete, add missing closing braces
missingBraces = openBraces - closeBraces
result = result + '}' * missingBraces
self.logger.info(f"Added {missingBraces} missing closing brace(s)")
self.logger.debug(f"Fixed JSON: {result}")
# Try parsing the fixed JSON
try:
styles = json.loads(result)
self.logger.info("Successfully fixed incomplete JSON")
except json.JSONDecodeError as fixError:
self.logger.warning(f"Fixed JSON still invalid: {fixError}")
self.logger.warning(f"Fixed JSON content: {result}")
# Try to extract just the JSON part if it's embedded in text
jsonStart = result.find('{')
jsonEnd = result.rfind('}')
if jsonStart != -1 and jsonEnd != -1 and jsonEnd > jsonStart:
jsonPart = result[jsonStart:jsonEnd+1]
try:
styles = json.loads(jsonPart)
self.logger.info("Successfully extracted JSON from explanatory text")
except json.JSONDecodeError:
self.logger.warning("Could not extract valid JSON from response, using defaults")
return defaultStyles
else:
return defaultStyles
else:
# Try to extract just the JSON part if it's embedded in text
jsonStart = result.find('{')
jsonEnd = result.rfind('}')
if jsonStart != -1 and jsonEnd != -1 and jsonEnd > jsonStart:
jsonPart = result[jsonStart:jsonEnd+1]
try:
styles = json.loads(jsonPart)
self.logger.info("Successfully extracted JSON from explanatory text")
except json.JSONDecodeError:
self.logger.warning("Could not extract valid JSON from response, using defaults")
return defaultStyles
else:
return defaultStyles
# Convert colors to appropriate format
styles = self._convertColorsFormat(styles)
return styles
except Exception as e:
self.logger.warning(f"AI styling failed: {str(e)}, using defaults")
return defaultStyles
def _convertColorsFormat(self, styles: Dict[str, Any]) -> Dict[str, Any]:
"""
Convert colors to appropriate format based on renderer type.
Override this method in subclasses for format-specific color handling.
"""
return styles
def _createAiStyleTemplate(self, formatName: str, userPrompt: str, styleSchema: Dict[str, Any]) -> str:
"""
Create a standardized AI style template for any format.
Args:
formatName: Name of the format (e.g., "docx", "xlsx", "pptx")
userPrompt: User's original prompt
styleSchema: Format-specific style schema
Returns:
Formatted prompt string
"""
schemaJson = json.dumps(styleSchema, indent=4)
# DEBUG: Show the schema being sent
return f"""You are a professional document styling expert. Generate a complete JSON styling configuration for {formatName.upper()} documents.
Use this schema as a template and customize the values for professional document styling:
{schemaJson}
Requirements:
- Return ONLY the complete JSON object (no markdown, no explanations)
- Customize colors, fonts, and spacing for professional appearance
- Ensure all objects are properly closed with closing braces
- Make the styling modern and professional
Return the complete JSON:"""