gateway/modules/serviceCenter/services/serviceGeneration/renderers/documentRendererBaseTemplate.py
2026-03-06 14:03:18 +01:00

484 lines
No EOL
20 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Base renderer class for all format renderers.
"""
from abc import ABC, abstractmethod
from typing import Dict, Any, List, Tuple, Optional
from modules.datamodels.datamodelJson import supportedSectionTypes
from modules.datamodels.datamodelDocument import RenderedDocument
import json
import logging
import re
from datetime import datetime, UTC
import base64
import io
from PIL import Image
from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, OperationTypeEnum
logger = logging.getLogger(__name__)
class BaseRenderer(ABC):
"""Base class for all format renderers."""
def __init__(self, services=None):
self.logger = logger
self.services = services # Add services attribute
@classmethod
def getSupportedFormats(cls) -> List[str]:
"""
Return list of supported format names for this renderer.
Override this method in subclasses to specify supported formats.
"""
return []
@classmethod
def getFormatAliases(cls) -> List[str]:
"""
Return list of format aliases for this renderer.
Override this method in subclasses to specify format aliases.
"""
return []
@classmethod
def getPriority(cls) -> int:
"""
Return priority for this renderer (higher number = higher priority).
Used when multiple renderers support the same format.
"""
return 0
@classmethod
def getOutputStyle(cls, formatName: Optional[str] = None) -> str:
"""
Return the output style classification for this renderer.
Returns: 'code', 'document', 'image', or other (e.g., 'video' for future use)
Override this method in subclasses to specify the output style.
Args:
formatName: Optional format name (e.g., 'txt', 'js', 'csv') - useful for renderers
that handle multiple formats with different styles (e.g., RendererText)
"""
return 'document' # Default to document style
@classmethod
def getAcceptedSectionTypes(cls, formatName: Optional[str] = None) -> List[str]:
"""
Return list of section content types that this renderer accepts.
This allows renderers to declare which section types they can process.
Default implementation returns all supported section types.
Override this method in subclasses to restrict accepted types.
Args:
formatName: Optional format name (e.g., 'txt', 'js', 'csv') - useful for renderers
that handle multiple formats with different accepted types (e.g., RendererText)
Returns:
List of accepted section content types (e.g., ["table", "paragraph", "heading"])
Valid types: "table", "bullet_list", "heading", "paragraph", "code_block", "image"
"""
# Default: accept all section types
return list(supportedSectionTypes)
@abstractmethod
async def render(self, extractedContent: Dict[str, Any], title: str, userPrompt: str = None, aiService=None) -> List[RenderedDocument]:
"""
Render extracted JSON content to multiple documents.
Each renderer must implement this method.
Can return 1..n documents (e.g., HTML + images).
Args:
extractedContent: Structured JSON content with sections and metadata (contains single document)
title: Report title
userPrompt: Original user prompt for context
aiService: AI service instance for additional processing
Returns:
List of RenderedDocument objects.
First document is the main document, additional documents are supporting files (e.g., images).
Even if only one document is returned, it must be wrapped in a list.
"""
pass
def _determineFilename(self, title: str, mimeType: str) -> str:
"""Determine filename from title and mimeType."""
import re
# Get extension from mimeType
extensionMap = {
"text/html": "html",
"application/pdf": "pdf",
"application/vnd.openxmlformats-officedocument.wordprocessingml.document": "docx",
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet": "xlsx",
"text/plain": "txt",
"text/markdown": "md",
"application/json": "json",
"text/csv": "csv"
}
extension = extensionMap.get(mimeType, "txt")
# Sanitize title for filename
sanitized = re.sub(r"[^a-zA-Z0-9._-]", "_", title)
sanitized = re.sub(r"_+", "_", sanitized).strip("_")
if not sanitized:
sanitized = "document"
return f"{sanitized}.{extension}"
def _extractSections(self, reportData: Dict[str, Any]) -> List[Dict[str, Any]]:
"""
Extract sections from standardized schema: {metadata: {...}, documents: [{sections: [...]}]}
Phase 5: Supports multiple documents - extracts all sections from all documents.
"""
if "documents" not in reportData:
raise ValueError("Report data must follow standardized schema with 'documents' array")
documents = reportData.get("documents", [])
if not isinstance(documents, list) or len(documents) == 0:
raise ValueError("Standardized schema must contain at least one document in 'documents' array")
# Phase 5: Extract sections from ALL documents
all_sections = []
for doc in documents:
if isinstance(doc, dict) and "sections" in doc:
sections = doc.get("sections", [])
if isinstance(sections, list):
all_sections.extend(sections)
if not all_sections:
raise ValueError("No sections found in any document")
return all_sections
def _extractMetadata(self, reportData: Dict[str, Any]) -> Dict[str, Any]:
"""
Extract metadata from standardized schema: {metadata: {...}, documents: [{sections: [...]}]}
"""
if "metadata" not in reportData:
raise ValueError("Report data must follow standardized schema with 'metadata' field")
metadata = reportData.get("metadata", {})
if not isinstance(metadata, dict):
raise ValueError("Metadata in standardized schema must be a dictionary")
return metadata
def _getTitle(self, reportData: Dict[str, Any], fallbackTitle: str) -> str:
"""Get title from report data or use fallback."""
metadata = reportData.get('metadata', {})
return metadata.get('title', fallbackTitle)
def _validateJsonStructure(self, jsonContent: Dict[str, Any]) -> bool:
"""
Validate that JSON content follows standardized schema: {metadata: {...}, documents: [{sections: [...]}]}
"""
if not isinstance(jsonContent, dict):
return False
# Validate metadata field exists
if "metadata" not in jsonContent:
return False
if not isinstance(jsonContent.get("metadata"), dict):
return False
# Validate documents array exists and is not empty
if "documents" not in jsonContent:
return False
documents = jsonContent.get("documents", [])
if not isinstance(documents, list) or len(documents) == 0:
return False
# Validate first document has sections
firstDoc = documents[0]
if not isinstance(firstDoc, dict) or "sections" not in firstDoc:
return False
sections = firstDoc.get("sections", [])
if not isinstance(sections, list):
return False
# Validate each section has content_type and elements
for section in sections:
if not isinstance(section, dict):
return False
if "content_type" not in section or "elements" not in section:
return False
return True
def _getSectionType(self, section: Dict[str, Any]) -> str:
"""Get the type of a section; default to 'paragraph' for non-dict inputs."""
if isinstance(section, dict):
return section.get("content_type", "paragraph")
# If section is a list or any other type, treat as paragraph elements
return "paragraph"
def _getSectionData(self, section: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Get the elements of a section; if a list is provided directly, return it."""
if isinstance(section, dict):
return section.get("elements", [])
if isinstance(section, list):
return section
return []
def _getSectionId(self, section: Dict[str, Any]) -> str:
"""Get the ID of a section (if available)."""
if isinstance(section, dict):
return section.get("id", "unknown")
return "unknown"
def _validateImageData(self, base64Data: str, altText: str) -> bool:
"""Validate image data."""
if not base64Data:
self.logger.warning("Image section has no base64 data")
return False
if not altText:
self.logger.warning("Image section has no alt text")
return False
# Basic base64 validation
try:
base64.b64decode(base64Data, validate=True)
return True
except Exception as e:
self.logger.warning(f"Invalid base64 image data: {str(e)}")
return False
def _getImageDimensions(self, base64Data: str) -> Tuple[int, int]:
"""
Get image dimensions from base64 data.
This is a helper method that format-specific renderers can use.
"""
try:
# Decode base64 data
imageData = base64.b64decode(base64Data)
image = Image.open(io.BytesIO(imageData))
return image.size # Returns (width, height)
except Exception as e:
self.logger.warning(f"Could not determine image dimensions: {str(e)}")
return (0, 0)
def _resizeImageIfNeeded(self, base64Data: str, maxWidth: int = 800, maxHeight: int = 600) -> str:
"""
Resize image if it exceeds maximum dimensions.
Returns the resized image as base64 string.
"""
try:
# Decode base64 data
imageData = base64.b64decode(base64Data)
image = Image.open(io.BytesIO(imageData))
# Check if resizing is needed
width, height = image.size
if width <= maxWidth and height <= maxHeight:
return base64Data # No resizing needed
# Calculate new dimensions maintaining aspect ratio
ratio = min(maxWidth / width, maxHeight / height)
newWidth = int(width * ratio)
newHeight = int(height * ratio)
# Resize image
resizedImage = image.resize((newWidth, newHeight), Image.Resampling.LANCZOS)
# Convert back to base64
buffer = io.BytesIO()
resizedImage.save(buffer, format=image.format or 'PNG')
resizedData = buffer.getvalue()
return base64.b64encode(resizedData).decode('utf-8')
except Exception as e:
self.logger.warning(f"Could not resize image: {str(e)}")
return base64Data # Return original if resize fails
def _getSupportedSectionTypes(self) -> List[str]:
"""Return list of supported section types (from unified schema)."""
return supportedSectionTypes
def _isValidSectionType(self, sectionType: str) -> bool:
"""Check if a section type is valid."""
return sectionType in self._getSupportedSectionTypes()
def _formatTimestamp(self, timestamp: str = None) -> str:
"""Format timestamp for display."""
if timestamp:
return timestamp
return datetime.now(UTC).strftime("%Y-%m-%d %H:%M:%S UTC")
# ===== GENERIC AI STYLING HELPERS =====
async def _getAiStyles(self, aiService, styleTemplate: str, defaultStyles: Dict[str, Any]) -> Dict[str, Any]:
"""
Generic AI styling method that can be used by all renderers.
Args:
aiService: AI service instance
styleTemplate: Format-specific style template
defaultStyles: Default styles to fall back to
Returns:
Dict with styling definitions
"""
# DEBUG: Show which renderer is calling this method
if not aiService:
return defaultStyles
try:
requestOptions = AiCallOptions()
requestOptions.operationType = OperationTypeEnum.DATA_GENERATE
request = AiCallRequest(prompt=styleTemplate, context="", options=requestOptions)
# DEBUG: Show the actual prompt being sent to AI
self.logger.debug(f"AI Style Template Prompt:")
self.logger.debug(f"{styleTemplate}")
response = await aiService.callAi(request)
# Save styling prompt and response to debug (fire and forget - don't block on slow file I/O)
# The writeDebugFile calls os.listdir() which can be slow with many files
# Run in background thread to avoid blocking rendering
import threading
def _writeDebugFiles():
try:
self.services.utils.writeDebugFile(styleTemplate, "renderer_styling_prompt")
self.services.utils.writeDebugFile(response.content or '', "renderer_styling_response")
except Exception:
pass # Silently fail - debug writing should never block rendering
threading.Thread(target=_writeDebugFiles, daemon=True).start()
# Clean and parse JSON
result = response.content.strip() if response and response.content else ""
# Check if result is empty
if not result:
self.logger.warning("AI styling returned empty response, using defaults")
return defaultStyles
# Extract JSON from markdown if present
jsonMatch = re.search(r'```json\s*\n(.*?)\n```', result, re.DOTALL)
if jsonMatch:
result = jsonMatch.group(1).strip()
elif result.startswith('```json'):
result = re.sub(r'^```json\s*', '', result)
result = re.sub(r'\s*```$', '', result)
elif result.startswith('```'):
result = re.sub(r'^```\s*', '', result)
result = re.sub(r'\s*```$', '', result)
# Try to parse JSON
try:
styles = json.loads(result)
except json.JSONDecodeError as jsonError:
self.logger.warning(f"AI styling returned invalid JSON: {jsonError}")
# Use print instead of logger to avoid truncation
self.services.utils.debugLogToFile(f"FULL AI RESPONSE THAT FAILED TO PARSE: {result}", "RENDERER")
self.services.utils.debugLogToFile(f"RESPONSE LENGTH: {len(result)} characters", "RENDERER")
self.logger.warning(f"Raw content that failed to parse: {result}")
# Try to fix incomplete JSON by adding missing closing braces
openBraces = result.count('{')
closeBraces = result.count('}')
if openBraces > closeBraces:
# JSON is incomplete, add missing closing braces
missingBraces = openBraces - closeBraces
result = result + '}' * missingBraces
self.logger.info(f"Added {missingBraces} missing closing brace(s)")
self.logger.debug(f"Fixed JSON: {result}")
# Try parsing the fixed JSON
try:
styles = json.loads(result)
self.logger.info("Successfully fixed incomplete JSON")
except json.JSONDecodeError as fixError:
self.logger.warning(f"Fixed JSON still invalid: {fixError}")
self.logger.warning(f"Fixed JSON content: {result}")
# Try to extract just the JSON part if it's embedded in text
jsonStart = result.find('{')
jsonEnd = result.rfind('}')
if jsonStart != -1 and jsonEnd != -1 and jsonEnd > jsonStart:
jsonPart = result[jsonStart:jsonEnd+1]
try:
styles = json.loads(jsonPart)
self.logger.info("Successfully extracted JSON from explanatory text")
except json.JSONDecodeError:
self.logger.warning("Could not extract valid JSON from response, using defaults")
return defaultStyles
else:
return defaultStyles
else:
# Try to extract just the JSON part if it's embedded in text
jsonStart = result.find('{')
jsonEnd = result.rfind('}')
if jsonStart != -1 and jsonEnd != -1 and jsonEnd > jsonStart:
jsonPart = result[jsonStart:jsonEnd+1]
try:
styles = json.loads(jsonPart)
self.logger.info("Successfully extracted JSON from explanatory text")
except json.JSONDecodeError:
self.logger.warning("Could not extract valid JSON from response, using defaults")
return defaultStyles
else:
return defaultStyles
# Convert colors to appropriate format
styles = self._convertColorsFormat(styles)
return styles
except Exception as e:
self.logger.warning(f"AI styling failed: {str(e)}, using defaults")
return defaultStyles
def _convertColorsFormat(self, styles: Dict[str, Any]) -> Dict[str, Any]:
"""
Convert colors to appropriate format based on renderer type.
Override this method in subclasses for format-specific color handling.
"""
return styles
def _createAiStyleTemplate(self, formatName: str, userPrompt: str, styleSchema: Dict[str, Any]) -> str:
"""
Create a standardized AI style template for any format.
Args:
formatName: Name of the format (e.g., "docx", "xlsx", "pptx")
userPrompt: User's original prompt
styleSchema: Format-specific style schema
Returns:
Formatted prompt string
"""
schemaJson = json.dumps(styleSchema, indent=4)
# DEBUG: Show the schema being sent
return f"""You are a professional document styling expert. Generate a complete JSON styling configuration for {formatName.upper()} documents.
User request: {userPrompt}
Use this schema as a template:
{schemaJson}
Requirements:
- Return ONLY the complete JSON object (no markdown, no explanations)
- If the user request contains style/formatting/design instructions (in any language), customize the styling accordingly (adapt styles and add styles if needed)
- If the user request has NO style instructions, return the default schema values unchanged
- Ensure all objects are properly closed with closing braces
- Only modify styles if style instructions are present in the user request
Return the complete JSON:"""