""" Base renderer class for all format renderers. """ from abc import ABC, abstractmethod from typing import Dict, Any, Tuple, List from modules.datamodels.datamodelJson import supportedSectionTypes import json import logging import re from datetime import datetime, UTC import base64 import io from PIL import Image from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, OperationTypeEnum logger = logging.getLogger(__name__) class BaseRenderer(ABC): """Base class for all format renderers.""" def __init__(self, services=None): self.logger = logger self.services = services # Add services attribute @classmethod def getSupportedFormats(cls) -> List[str]: """ Return list of supported format names for this renderer. Override this method in subclasses to specify supported formats. """ return [] @classmethod def getFormatAliases(cls) -> List[str]: """ Return list of format aliases for this renderer. Override this method in subclasses to specify format aliases. """ return [] @classmethod def getPriority(cls) -> int: """ Return priority for this renderer (higher number = higher priority). Used when multiple renderers support the same format. """ return 0 @abstractmethod async def render(self, extractedContent: Dict[str, Any], title: str, userPrompt: str = None, aiService=None) -> Tuple[str, str]: """ Render extracted JSON content to the target format. Args: extractedContent: Structured JSON content with sections and metadata title: Report title userPrompt: Original user prompt for context aiService: AI service instance for additional processing Returns: tuple: (renderedContent, mimeType) """ pass def _extractSections(self, reportData: Dict[str, Any]) -> List[Dict[str, Any]]: """Extract sections from report data.""" return reportData.get('sections', []) def _extractMetadata(self, reportData: Dict[str, Any]) -> Dict[str, Any]: """Extract metadata from report data.""" return reportData.get('metadata', {}) def _getTitle(self, reportData: Dict[str, Any], fallbackTitle: str) -> str: """Get title from report data or use fallback.""" metadata = reportData.get('metadata', {}) return metadata.get('title', fallbackTitle) def _validateJsonStructure(self, jsonContent: Dict[str, Any]) -> bool: """Validate that JSON content has the expected structure.""" if not isinstance(jsonContent, dict): return False if "sections" not in jsonContent: return False sections = jsonContent.get("sections", []) if not isinstance(sections, list): return False # Validate each section has content_type and elements for section in sections: if not isinstance(section, dict): return False if "content_type" not in section or "elements" not in section: return False return True def _getSectionType(self, section: Dict[str, Any]) -> str: """Get the type of a section; default to 'paragraph' for non-dict inputs.""" if isinstance(section, dict): return section.get("content_type", "paragraph") # If section is a list or any other type, treat as paragraph elements return "paragraph" def _getSectionData(self, section: Dict[str, Any]) -> List[Dict[str, Any]]: """Get the elements of a section; if a list is provided directly, return it.""" if isinstance(section, dict): return section.get("elements", []) if isinstance(section, list): return section return [] def _getSectionId(self, section: Dict[str, Any]) -> str: """Get the ID of a section (if available).""" if isinstance(section, dict): return section.get("id", "unknown") return "unknown" def _extractTableData(self, sectionData: Dict[str, Any]) -> Tuple[List[str], List[List[str]]]: """Extract table headers and rows from section data.""" # Normalize when elements array was passed in if isinstance(sectionData, list) and sectionData: candidate = sectionData[0] sectionData = candidate if isinstance(candidate, dict) else {} headers = sectionData.get("headers", []) rows = sectionData.get("rows", []) return headers, rows def _extractBulletListItems(self, sectionData: Dict[str, Any]) -> List[str]: """Extract bullet list items from section data.""" # Normalize when elements array or raw list was passed in if isinstance(sectionData, list): # Already a list of items (strings or dicts) items = sectionData else: items = sectionData.get("items", []) result = [] for item in items: if isinstance(item, str): result.append(item) elif isinstance(item, dict) and "text" in item: result.append(item["text"]) return result def _extractHeadingData(self, sectionData: Dict[str, Any]) -> Tuple[int, str]: """Extract heading level and text from section data.""" # Normalize when elements array was passed in if isinstance(sectionData, list) and sectionData: sectionData = sectionData[0] if isinstance(sectionData[0], dict) else {} level = sectionData.get("level", 1) text = sectionData.get("text", "") return level, text def _extractParagraphText(self, sectionData: Dict[str, Any]) -> str: """Extract paragraph text from section data.""" if isinstance(sectionData, list): # Join multiple paragraph elements if provided as a list texts = [] for el in sectionData: if isinstance(el, dict) and "text" in el: texts.append(el["text"]) elif isinstance(el, str): texts.append(el) return "\n".join(texts) return sectionData.get("text", "") def _extractCodeBlockData(self, sectionData: Dict[str, Any]) -> Tuple[str, str]: """Extract code and language from section data.""" # Normalize when elements array was passed in if isinstance(sectionData, list) and sectionData: sectionData = sectionData[0] if isinstance(sectionData[0], dict) else {} code = sectionData.get("code", "") language = sectionData.get("language", "") return code, language def _extractImageData(self, sectionData: Dict[str, Any]) -> Tuple[str, str]: """Extract base64 data and alt text from section data.""" # Normalize when elements array was passed in if isinstance(sectionData, list) and sectionData: sectionData = sectionData[0] if isinstance(sectionData[0], dict) else {} base64Data = sectionData.get("base64Data", "") altText = sectionData.get("altText", "Image") return base64Data, altText def _renderImageSection(self, section: Dict[str, Any], styles: Dict[str, Any] = None) -> Any: """ Render an image section. This is a base implementation that should be overridden by format-specific renderers. Args: section: Image section data styles: Optional styling information Returns: Format-specific image representation """ sectionData = self._getSectionData(section) base64Data, altText = self._extractImageData(sectionData) # Base implementation returns a simple dict # Format-specific renderers should override this method return { "content_type": "image", "base64Data": base64Data, "altText": altText, "width": sectionData.get("width", None), "height": sectionData.get("height", None), "caption": sectionData.get("caption", "") } def _validateImageData(self, base64Data: str, altText: str) -> bool: """Validate image data.""" if not base64Data: self.logger.warning("Image section has no base64 data") return False if not altText: self.logger.warning("Image section has no alt text") return False # Basic base64 validation try: base64.b64decode(base64Data, validate=True) return True except Exception as e: self.logger.warning(f"Invalid base64 image data: {str(e)}") return False def _getImageDimensions(self, base64Data: str) -> Tuple[int, int]: """ Get image dimensions from base64 data. This is a helper method that format-specific renderers can use. """ try: # Decode base64 data imageData = base64.b64decode(base64Data) image = Image.open(io.BytesIO(imageData)) return image.size # Returns (width, height) except Exception as e: self.logger.warning(f"Could not determine image dimensions: {str(e)}") return (0, 0) def _resizeImageIfNeeded(self, base64Data: str, maxWidth: int = 800, maxHeight: int = 600) -> str: """ Resize image if it exceeds maximum dimensions. Returns the resized image as base64 string. """ try: # Decode base64 data imageData = base64.b64decode(base64Data) image = Image.open(io.BytesIO(imageData)) # Check if resizing is needed width, height = image.size if width <= maxWidth and height <= maxHeight: return base64Data # No resizing needed # Calculate new dimensions maintaining aspect ratio ratio = min(maxWidth / width, maxHeight / height) newWidth = int(width * ratio) newHeight = int(height * ratio) # Resize image resizedImage = image.resize((newWidth, newHeight), Image.Resampling.LANCZOS) # Convert back to base64 buffer = io.BytesIO() resizedImage.save(buffer, format=image.format or 'PNG') resizedData = buffer.getvalue() return base64.b64encode(resizedData).decode('utf-8') except Exception as e: self.logger.warning(f"Could not resize image: {str(e)}") return base64Data # Return original if resize fails def _getSupportedSectionTypes(self) -> List[str]: """Return list of supported section types (from unified schema).""" return supportedSectionTypes def _isValidSectionType(self, sectionType: str) -> bool: """Check if a section type is valid.""" return sectionType in self._getSupportedSectionTypes() def _processSectionByType(self, section: Dict[str, Any]) -> Dict[str, Any]: """Process a section and return structured data based on its type.""" sectionType = self._getSectionType(section) sectionData = self._getSectionData(section) if sectionType == "table": headers, rows = self._extractTableData(sectionData) return {"content_type": "table", "headers": headers, "rows": rows} elif sectionType == "bullet_list": items = self._extractBulletListItems(sectionData) return {"content_type": "bullet_list", "items": items} elif sectionType == "heading": level, text = self._extractHeadingData(sectionData) return {"content_type": "heading", "level": level, "text": text} elif sectionType == "paragraph": text = self._extractParagraphText(sectionData) return {"content_type": "paragraph", "text": text} elif sectionType == "code_block": code, language = self._extractCodeBlockData(sectionData) return {"content_type": "code_block", "code": code, "language": language} elif sectionType == "image": base64Data, altText = self._extractImageData(sectionData) # Validate image data if self._validateImageData(base64Data, altText): return { "content_type": "image", "base64Data": base64Data, "altText": altText, "width": sectionData.get("width") if isinstance(sectionData, dict) else None, "height": sectionData.get("height") if isinstance(sectionData, dict) else None, "caption": sectionData.get("caption", "") if isinstance(sectionData, dict) else "" } else: # Return placeholder if image data is invalid return {"content_type": "paragraph", "text": f"[Image: {altText}]"} else: # Fallback to paragraph text = self._extractParagraphText(sectionData) return {"content_type": "paragraph", "text": text} def _formatTimestamp(self, timestamp: str = None) -> str: """Format timestamp for display.""" if timestamp: return timestamp return datetime.now(UTC).strftime("%Y-%m-%d %H:%M:%S UTC") # ===== GENERIC AI STYLING HELPERS ===== async def _getAiStyles(self, aiService, styleTemplate: str, defaultStyles: Dict[str, Any]) -> Dict[str, Any]: """ Generic AI styling method that can be used by all renderers. Args: aiService: AI service instance styleTemplate: Format-specific style template defaultStyles: Default styles to fall back to Returns: Dict with styling definitions """ # DEBUG: Show which renderer is calling this method if not aiService: return defaultStyles try: requestOptions = AiCallOptions() requestOptions.operationType = OperationTypeEnum.DATA_GENERATE request = AiCallRequest(prompt=styleTemplate, context="", options=requestOptions) # DEBUG: Show the actual prompt being sent to AI self.logger.debug(f"AI Style Template Prompt:") self.logger.debug(f"{styleTemplate}") response = await aiService.callAi(request) # Save styling prompt and response to debug self.services.utils.writeDebugFile(styleTemplate, "renderer_styling_prompt") self.services.utils.writeDebugFile(response.content or '', "renderer_styling_response") # Clean and parse JSON result = response.content.strip() if response and response.content else "" # Check if result is empty if not result: self.logger.warning("AI styling returned empty response, using defaults") return defaultStyles # Extract JSON from markdown if present jsonMatch = re.search(r'```json\s*\n(.*?)\n```', result, re.DOTALL) if jsonMatch: result = jsonMatch.group(1).strip() elif result.startswith('```json'): result = re.sub(r'^```json\s*', '', result) result = re.sub(r'\s*```$', '', result) elif result.startswith('```'): result = re.sub(r'^```\s*', '', result) result = re.sub(r'\s*```$', '', result) # Try to parse JSON try: styles = json.loads(result) except json.JSONDecodeError as jsonError: self.logger.warning(f"AI styling returned invalid JSON: {jsonError}") # Use print instead of logger to avoid truncation self.services.utils.debugLogToFile(f"FULL AI RESPONSE THAT FAILED TO PARSE: {result}", "RENDERER") self.services.utils.debugLogToFile(f"RESPONSE LENGTH: {len(result)} characters", "RENDERER") self.logger.warning(f"Raw content that failed to parse: {result}") # Try to fix incomplete JSON by adding missing closing braces openBraces = result.count('{') closeBraces = result.count('}') if openBraces > closeBraces: # JSON is incomplete, add missing closing braces missingBraces = openBraces - closeBraces result = result + '}' * missingBraces self.logger.info(f"Added {missingBraces} missing closing brace(s)") self.logger.debug(f"Fixed JSON: {result}") # Try parsing the fixed JSON try: styles = json.loads(result) self.logger.info("Successfully fixed incomplete JSON") except json.JSONDecodeError as fixError: self.logger.warning(f"Fixed JSON still invalid: {fixError}") self.logger.warning(f"Fixed JSON content: {result}") # Try to extract just the JSON part if it's embedded in text jsonStart = result.find('{') jsonEnd = result.rfind('}') if jsonStart != -1 and jsonEnd != -1 and jsonEnd > jsonStart: jsonPart = result[jsonStart:jsonEnd+1] try: styles = json.loads(jsonPart) self.logger.info("Successfully extracted JSON from explanatory text") except json.JSONDecodeError: self.logger.warning("Could not extract valid JSON from response, using defaults") return defaultStyles else: return defaultStyles else: # Try to extract just the JSON part if it's embedded in text jsonStart = result.find('{') jsonEnd = result.rfind('}') if jsonStart != -1 and jsonEnd != -1 and jsonEnd > jsonStart: jsonPart = result[jsonStart:jsonEnd+1] try: styles = json.loads(jsonPart) self.logger.info("Successfully extracted JSON from explanatory text") except json.JSONDecodeError: self.logger.warning("Could not extract valid JSON from response, using defaults") return defaultStyles else: return defaultStyles # Convert colors to appropriate format styles = self._convertColorsFormat(styles) return styles except Exception as e: self.logger.warning(f"AI styling failed: {str(e)}, using defaults") return defaultStyles def _convertColorsFormat(self, styles: Dict[str, Any]) -> Dict[str, Any]: """ Convert colors to appropriate format based on renderer type. Override this method in subclasses for format-specific color handling. """ return styles def _createAiStyleTemplate(self, formatName: str, userPrompt: str, styleSchema: Dict[str, Any]) -> str: """ Create a standardized AI style template for any format. Args: formatName: Name of the format (e.g., "docx", "xlsx", "pptx") userPrompt: User's original prompt styleSchema: Format-specific style schema Returns: Formatted prompt string """ schemaJson = json.dumps(styleSchema, indent=4) # DEBUG: Show the schema being sent return f"""You are a professional document styling expert. Generate a complete JSON styling configuration for {formatName.upper()} documents. User request: {userPrompt} Use this schema as a template: {schemaJson} Requirements: - Return ONLY the complete JSON object (no markdown, no explanations) - If the user request contains style/formatting/design instructions (in any language), customize the styling accordingly (adapt styles and add styles if needed) - If the user request has NO style instructions, return the default schema values unchanged - Ensure all objects are properly closed with closing braces - Only modify styles if style instructions are present in the user request Return the complete JSON:"""