459 lines
No EOL
19 KiB
Python
459 lines
No EOL
19 KiB
Python
"""
|
|
Base renderer class for all format renderers.
|
|
"""
|
|
|
|
from abc import ABC, abstractmethod
|
|
from typing import Dict, Any, Tuple, List
|
|
import logging
|
|
import json
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class BaseRenderer(ABC):
|
|
"""Base class for all format renderers."""
|
|
|
|
def __init__(self, services=None):
|
|
self.logger = logger
|
|
self.services = services # Add services attribute
|
|
|
|
@classmethod
|
|
def get_supported_formats(cls) -> List[str]:
|
|
"""
|
|
Return list of supported format names for this renderer.
|
|
Override this method in subclasses to specify supported formats.
|
|
"""
|
|
return []
|
|
|
|
@classmethod
|
|
def get_format_aliases(cls) -> List[str]:
|
|
"""
|
|
Return list of format aliases for this renderer.
|
|
Override this method in subclasses to specify format aliases.
|
|
"""
|
|
return []
|
|
|
|
@classmethod
|
|
def get_priority(cls) -> int:
|
|
"""
|
|
Return priority for this renderer (higher number = higher priority).
|
|
Used when multiple renderers support the same format.
|
|
"""
|
|
return 0
|
|
|
|
@abstractmethod
|
|
async def render(self, extracted_content: Dict[str, Any], title: str, user_prompt: str = None, ai_service=None) -> Tuple[str, str]:
|
|
"""
|
|
Render extracted JSON content to the target format.
|
|
|
|
Args:
|
|
extracted_content: Structured JSON content with sections and metadata
|
|
title: Report title
|
|
user_prompt: Original user prompt for context
|
|
ai_service: AI service instance for additional processing
|
|
|
|
Returns:
|
|
tuple: (rendered_content, mime_type)
|
|
"""
|
|
pass
|
|
|
|
def _extract_sections(self, report_data: Dict[str, Any]) -> List[Dict[str, Any]]:
|
|
"""Extract sections from report data."""
|
|
return report_data.get('sections', [])
|
|
|
|
def _extract_metadata(self, report_data: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Extract metadata from report data."""
|
|
return report_data.get('metadata', {})
|
|
|
|
def _get_title(self, report_data: Dict[str, Any], fallback_title: str) -> str:
|
|
"""Get title from report data or use fallback."""
|
|
metadata = report_data.get('metadata', {})
|
|
return metadata.get('title', fallback_title)
|
|
|
|
def _validate_json_structure(self, json_content: Dict[str, Any]) -> bool:
|
|
"""Validate that JSON content has the expected structure."""
|
|
if not isinstance(json_content, dict):
|
|
return False
|
|
|
|
if "sections" not in json_content:
|
|
return False
|
|
|
|
sections = json_content.get("sections", [])
|
|
if not isinstance(sections, list):
|
|
return False
|
|
|
|
# Validate each section has type and data
|
|
for section in sections:
|
|
if not isinstance(section, dict):
|
|
return False
|
|
if "type" not in section or "data" not in section:
|
|
return False
|
|
|
|
return True
|
|
|
|
def _get_section_type(self, section: Dict[str, Any]) -> str:
|
|
"""Get the type of a section."""
|
|
return section.get("type", "paragraph")
|
|
|
|
def _get_section_data(self, section: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Get the data of a section."""
|
|
return section.get("data", {})
|
|
|
|
def _get_section_id(self, section: Dict[str, Any]) -> str:
|
|
"""Get the ID of a section (if available)."""
|
|
return section.get("id", "unknown")
|
|
|
|
def _extract_table_data(self, section_data: Dict[str, Any]) -> Tuple[List[str], List[List[str]]]:
|
|
"""Extract table headers and rows from section data."""
|
|
headers = section_data.get("headers", [])
|
|
rows = section_data.get("rows", [])
|
|
return headers, rows
|
|
|
|
def _extract_bullet_list_items(self, section_data: Dict[str, Any]) -> List[str]:
|
|
"""Extract bullet list items from section data."""
|
|
items = section_data.get("items", [])
|
|
result = []
|
|
for item in items:
|
|
if isinstance(item, str):
|
|
result.append(item)
|
|
elif isinstance(item, dict) and "text" in item:
|
|
result.append(item["text"])
|
|
return result
|
|
|
|
def _extract_heading_data(self, section_data: Dict[str, Any]) -> Tuple[int, str]:
|
|
"""Extract heading level and text from section data."""
|
|
level = section_data.get("level", 1)
|
|
text = section_data.get("text", "")
|
|
return level, text
|
|
|
|
def _extract_paragraph_text(self, section_data: Dict[str, Any]) -> str:
|
|
"""Extract paragraph text from section data."""
|
|
return section_data.get("text", "")
|
|
|
|
def _extract_code_block_data(self, section_data: Dict[str, Any]) -> Tuple[str, str]:
|
|
"""Extract code and language from section data."""
|
|
code = section_data.get("code", "")
|
|
language = section_data.get("language", "")
|
|
return code, language
|
|
|
|
def _extract_image_data(self, section_data: Dict[str, Any]) -> Tuple[str, str]:
|
|
"""Extract base64 data and alt text from section data."""
|
|
base64_data = section_data.get("base64Data", "")
|
|
alt_text = section_data.get("altText", "Image")
|
|
return base64_data, alt_text
|
|
|
|
def _render_image_section(self, section: Dict[str, Any], styles: Dict[str, Any] = None) -> Any:
|
|
"""
|
|
Render an image section. This is a base implementation that should be overridden
|
|
by format-specific renderers.
|
|
|
|
Args:
|
|
section: Image section data
|
|
styles: Optional styling information
|
|
|
|
Returns:
|
|
Format-specific image representation
|
|
"""
|
|
section_data = self._get_section_data(section)
|
|
base64_data, alt_text = self._extract_image_data(section_data)
|
|
|
|
# Base implementation returns a simple dict
|
|
# Format-specific renderers should override this method
|
|
return {
|
|
"type": "image",
|
|
"base64Data": base64_data,
|
|
"altText": alt_text,
|
|
"width": section_data.get("width", None),
|
|
"height": section_data.get("height", None),
|
|
"caption": section_data.get("caption", "")
|
|
}
|
|
|
|
def _validate_image_data(self, base64_data: str, alt_text: str) -> bool:
|
|
"""Validate image data."""
|
|
if not base64_data:
|
|
self.logger.warning("Image section has no base64 data")
|
|
return False
|
|
|
|
if not alt_text:
|
|
self.logger.warning("Image section has no alt text")
|
|
return False
|
|
|
|
# Basic base64 validation
|
|
try:
|
|
import base64
|
|
base64.b64decode(base64_data, validate=True)
|
|
return True
|
|
except Exception as e:
|
|
self.logger.warning(f"Invalid base64 image data: {str(e)}")
|
|
return False
|
|
|
|
def _get_image_dimensions(self, base64_data: str) -> Tuple[int, int]:
|
|
"""
|
|
Get image dimensions from base64 data.
|
|
This is a helper method that format-specific renderers can use.
|
|
"""
|
|
try:
|
|
import base64
|
|
from PIL import Image
|
|
import io
|
|
|
|
# Decode base64 data
|
|
image_data = base64.b64decode(base64_data)
|
|
image = Image.open(io.BytesIO(image_data))
|
|
|
|
return image.size # Returns (width, height)
|
|
|
|
except Exception as e:
|
|
self.logger.warning(f"Could not determine image dimensions: {str(e)}")
|
|
return (0, 0)
|
|
|
|
def _resize_image_if_needed(self, base64_data: str, max_width: int = 800, max_height: int = 600) -> str:
|
|
"""
|
|
Resize image if it exceeds maximum dimensions.
|
|
Returns the resized image as base64 string.
|
|
"""
|
|
try:
|
|
import base64
|
|
from PIL import Image
|
|
import io
|
|
|
|
# Decode base64 data
|
|
image_data = base64.b64decode(base64_data)
|
|
image = Image.open(io.BytesIO(image_data))
|
|
|
|
# Check if resizing is needed
|
|
width, height = image.size
|
|
if width <= max_width and height <= max_height:
|
|
return base64_data # No resizing needed
|
|
|
|
# Calculate new dimensions maintaining aspect ratio
|
|
ratio = min(max_width / width, max_height / height)
|
|
new_width = int(width * ratio)
|
|
new_height = int(height * ratio)
|
|
|
|
# Resize image
|
|
resized_image = image.resize((new_width, new_height), Image.Resampling.LANCZOS)
|
|
|
|
# Convert back to base64
|
|
buffer = io.BytesIO()
|
|
resized_image.save(buffer, format=image.format or 'PNG')
|
|
resized_data = buffer.getvalue()
|
|
|
|
return base64.b64encode(resized_data).decode('utf-8')
|
|
|
|
except Exception as e:
|
|
self.logger.warning(f"Could not resize image: {str(e)}")
|
|
return base64_data # Return original if resize fails
|
|
|
|
def _get_supported_section_types(self) -> List[str]:
|
|
"""Return list of supported section types."""
|
|
return ["table", "bullet_list", "heading", "paragraph", "code_block", "image"]
|
|
|
|
def _is_valid_section_type(self, section_type: str) -> bool:
|
|
"""Check if a section type is valid."""
|
|
return section_type in self._get_supported_section_types()
|
|
|
|
def _process_section_by_type(self, section: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Process a section and return structured data based on its type."""
|
|
section_type = self._get_section_type(section)
|
|
section_data = self._get_section_data(section)
|
|
|
|
if section_type == "table":
|
|
headers, rows = self._extract_table_data(section_data)
|
|
return {"type": "table", "headers": headers, "rows": rows}
|
|
elif section_type == "bullet_list":
|
|
items = self._extract_bullet_list_items(section_data)
|
|
return {"type": "bullet_list", "items": items}
|
|
elif section_type == "heading":
|
|
level, text = self._extract_heading_data(section_data)
|
|
return {"type": "heading", "level": level, "text": text}
|
|
elif section_type == "paragraph":
|
|
text = self._extract_paragraph_text(section_data)
|
|
return {"type": "paragraph", "text": text}
|
|
elif section_type == "code_block":
|
|
code, language = self._extract_code_block_data(section_data)
|
|
return {"type": "code_block", "code": code, "language": language}
|
|
elif section_type == "image":
|
|
base64_data, alt_text = self._extract_image_data(section_data)
|
|
# Validate image data
|
|
if self._validate_image_data(base64_data, alt_text):
|
|
return {
|
|
"type": "image",
|
|
"base64Data": base64_data,
|
|
"altText": alt_text,
|
|
"width": section_data.get("width"),
|
|
"height": section_data.get("height"),
|
|
"caption": section_data.get("caption", "")
|
|
}
|
|
else:
|
|
# Return placeholder if image data is invalid
|
|
return {"type": "paragraph", "text": f"[Image: {alt_text}]"}
|
|
else:
|
|
# Fallback to paragraph
|
|
text = self._extract_paragraph_text(section_data)
|
|
return {"type": "paragraph", "text": text}
|
|
|
|
def _format_timestamp(self, timestamp: str = None) -> str:
|
|
"""Format timestamp for display."""
|
|
if timestamp:
|
|
return timestamp
|
|
from datetime import datetime, UTC
|
|
return datetime.now(UTC).strftime("%Y-%m-%d %H:%M:%S UTC")
|
|
|
|
# ===== GENERIC AI STYLING HELPERS =====
|
|
|
|
async def _get_ai_styles(self, ai_service, style_template: str, default_styles: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""
|
|
Generic AI styling method that can be used by all renderers.
|
|
|
|
Args:
|
|
ai_service: AI service instance
|
|
style_template: Format-specific style template
|
|
default_styles: Default styles to fall back to
|
|
|
|
Returns:
|
|
Dict with styling definitions
|
|
"""
|
|
# DEBUG: Show which renderer is calling this method
|
|
|
|
if not ai_service:
|
|
return default_styles
|
|
|
|
try:
|
|
from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, OperationType
|
|
|
|
request_options = AiCallOptions()
|
|
request_options.operationType = OperationType.GENERAL
|
|
|
|
request = AiCallRequest(prompt=style_template, context="", options=request_options)
|
|
|
|
# DEBUG: Show the actual prompt being sent to AI
|
|
self.logger.debug(f"AI Style Template Prompt:")
|
|
self.logger.debug(f"{style_template}")
|
|
|
|
response = await ai_service.aiObjects.call(request)
|
|
|
|
import json
|
|
import re
|
|
|
|
# Clean and parse JSON
|
|
result = response.content.strip() if response and response.content else ""
|
|
|
|
# Check if result is empty
|
|
if not result:
|
|
self.logger.warning("AI styling returned empty response, using defaults")
|
|
return default_styles
|
|
|
|
# Extract JSON from markdown if present
|
|
json_match = re.search(r'```json\s*\n(.*?)\n```', result, re.DOTALL)
|
|
if json_match:
|
|
result = json_match.group(1).strip()
|
|
elif result.startswith('```json'):
|
|
result = re.sub(r'^```json\s*', '', result)
|
|
result = re.sub(r'\s*```$', '', result)
|
|
elif result.startswith('```'):
|
|
result = re.sub(r'^```\s*', '', result)
|
|
result = re.sub(r'\s*```$', '', result)
|
|
|
|
# Try to parse JSON
|
|
try:
|
|
styles = json.loads(result)
|
|
except json.JSONDecodeError as json_error:
|
|
self.logger.warning(f"AI styling returned invalid JSON: {json_error}")
|
|
|
|
# Use print instead of logger to avoid truncation
|
|
self.services.utils.debugLogToFile(f"FULL AI RESPONSE THAT FAILED TO PARSE: {result}", "RENDERER")
|
|
self.services.utils.debugLogToFile(f"RESPONSE LENGTH: {len(result)} characters", "RENDERER")
|
|
|
|
self.logger.warning(f"Raw content that failed to parse: {result}")
|
|
|
|
# Try to fix incomplete JSON by adding missing closing braces
|
|
open_braces = result.count('{')
|
|
close_braces = result.count('}')
|
|
|
|
if open_braces > close_braces:
|
|
# JSON is incomplete, add missing closing braces
|
|
missing_braces = open_braces - close_braces
|
|
result = result + '}' * missing_braces
|
|
self.logger.info(f"Added {missing_braces} missing closing brace(s)")
|
|
self.logger.debug(f"Fixed JSON: {result}")
|
|
|
|
# Try parsing the fixed JSON
|
|
try:
|
|
styles = json.loads(result)
|
|
self.logger.info("Successfully fixed incomplete JSON")
|
|
except json.JSONDecodeError as fix_error:
|
|
self.logger.warning(f"Fixed JSON still invalid: {fix_error}")
|
|
self.logger.warning(f"Fixed JSON content: {result}")
|
|
# Try to extract just the JSON part if it's embedded in text
|
|
json_start = result.find('{')
|
|
json_end = result.rfind('}')
|
|
if json_start != -1 and json_end != -1 and json_end > json_start:
|
|
json_part = result[json_start:json_end+1]
|
|
try:
|
|
styles = json.loads(json_part)
|
|
self.logger.info("Successfully extracted JSON from explanatory text")
|
|
except json.JSONDecodeError:
|
|
self.logger.warning("Could not extract valid JSON from response, using defaults")
|
|
return default_styles
|
|
else:
|
|
return default_styles
|
|
else:
|
|
# Try to extract just the JSON part if it's embedded in text
|
|
json_start = result.find('{')
|
|
json_end = result.rfind('}')
|
|
if json_start != -1 and json_end != -1 and json_end > json_start:
|
|
json_part = result[json_start:json_end+1]
|
|
try:
|
|
styles = json.loads(json_part)
|
|
self.logger.info("Successfully extracted JSON from explanatory text")
|
|
except json.JSONDecodeError:
|
|
self.logger.warning("Could not extract valid JSON from response, using defaults")
|
|
return default_styles
|
|
else:
|
|
return default_styles
|
|
|
|
# Convert colors to appropriate format
|
|
styles = self._convert_colors_format(styles)
|
|
|
|
return styles
|
|
|
|
except Exception as e:
|
|
self.logger.warning(f"AI styling failed: {str(e)}, using defaults")
|
|
return default_styles
|
|
|
|
def _convert_colors_format(self, styles: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""
|
|
Convert colors to appropriate format based on renderer type.
|
|
Override this method in subclasses for format-specific color handling.
|
|
"""
|
|
return styles
|
|
|
|
def _create_ai_style_template(self, format_name: str, user_prompt: str, style_schema: Dict[str, Any]) -> str:
|
|
"""
|
|
Create a standardized AI style template for any format.
|
|
|
|
Args:
|
|
format_name: Name of the format (e.g., "docx", "xlsx", "pptx")
|
|
user_prompt: User's original prompt
|
|
style_schema: Format-specific style schema
|
|
|
|
Returns:
|
|
Formatted prompt string
|
|
"""
|
|
schema_json = json.dumps(style_schema, indent=4)
|
|
|
|
# DEBUG: Show the schema being sent
|
|
|
|
return f"""You are a professional document styling expert. Generate a complete JSON styling configuration for {format_name.upper()} documents.
|
|
|
|
Use this schema as a template and customize the values for professional document styling:
|
|
|
|
{schema_json}
|
|
|
|
Requirements:
|
|
- Return ONLY the complete JSON object (no markdown, no explanations)
|
|
- Customize colors, fonts, and spacing for professional appearance
|
|
- Ensure all objects are properly closed with closing braces
|
|
- Make the styling modern and professional
|
|
|
|
Return the complete JSON:""" |