# Copyright (c) 2025 Patrick Motsch # All rights reserved. """ CSV renderer for report generation. """ from .documentRendererBaseTemplate import BaseRenderer from modules.datamodels.datamodelDocument import RenderedDocument from typing import Dict, Any, List, Optional class RendererCsv(BaseRenderer): """Renders content to CSV format with format-specific extraction.""" @classmethod def getSupportedFormats(cls) -> List[str]: """Return supported CSV formats.""" return ['csv'] @classmethod def getFormatAliases(cls) -> List[str]: """Return format aliases.""" return ['spreadsheet', 'table'] @classmethod def getPriority(cls) -> int: """Return priority for CSV renderer.""" return 70 @classmethod def getOutputStyle(cls, formatName: Optional[str] = None) -> str: """Return output style classification: CSV document renderer converts structured document content to CSV.""" return 'document' @classmethod def getAcceptedSectionTypes(cls, formatName: Optional[str] = None) -> List[str]: """ Return list of section content types that CSV renderer accepts. CSV renderer accepts table sections and code_block sections (for raw CSV content). """ return ["table", "code_block"] async def render(self, extractedContent: Dict[str, Any], title: str, userPrompt: str = None, aiService=None) -> List[RenderedDocument]: """Render extracted JSON content to CSV format. Produces one CSV file per table section.""" try: # Validate JSON structure if not self._validateJsonStructure(extractedContent): raise ValueError("JSON content must follow standardized schema: {metadata: {...}, documents: [{sections: [...]}]}") # Extract sections and metadata sections = self._extractSections(extractedContent) metadata = self._extractMetadata(extractedContent) # Determine base filename from document or title documents = extractedContent.get("documents", []) baseFilename = None if documents and isinstance(documents[0], dict): baseFilename = documents[0].get("filename") if not baseFilename: baseFilename = self._determineFilename(title, "text/csv") # Remove extension from base filename if present if baseFilename.endswith('.csv'): baseFilename = baseFilename[:-4] # Collect CSV-producing sections: table sections AND code_block sections with CSV language tableSections = [] codeBlockCsvSections = [] for section in sections: sectionType = section.get("content_type", "paragraph") if sectionType == "table": tableSections.append(section) elif sectionType == "code_block": # Check if any element is a code_block with language "csv" for element in section.get("elements", []): content = element.get("content", {}) if isinstance(content, dict) and content.get("language", "").lower() == "csv": codeBlockCsvSections.append(section) break # If no usable sections found, return empty CSV if not tableSections and not codeBlockCsvSections: self.logger.warning("No table or CSV code_block sections found in CSV document - returning empty CSV") emptyCsv = self._convertRowsToCsv([["No table data available"]]) return [ RenderedDocument( documentData=emptyCsv.encode('utf-8'), mimeType="text/csv", filename=self._determineFilename(title, "text/csv"), documentType=metadata.get("documentType") if isinstance(metadata, dict) else None, metadata=metadata if isinstance(metadata, dict) else None ) ] allCsvSections = tableSections + codeBlockCsvSections # Generate one CSV file per section renderedDocuments = [] for i, csvSection in enumerate(allCsvSections): sectionType = csvSection.get("content_type", "paragraph") sectionTitle = csvSection.get("title") csvContent = "" if sectionType == "code_block": # Extract raw CSV content directly from code_block elements rawCsvParts = [] for element in csvSection.get("elements", []): content = element.get("content", {}) if isinstance(content, dict) and content.get("language", "").lower() == "csv": code = content.get("code", "") if code: rawCsvParts.append(code) csvContent = "\n".join(rawCsvParts) else: # Table section — render via table logic csvRows = [] if sectionTitle: csvRows.append([sectionTitle]) csvRows.append([]) # Empty row after title elements = csvSection.get("elements", []) for element in elements: tableRows = self._renderJsonTableToCsv(element) if tableRows: csvRows.extend(tableRows) csvContent = self._convertRowsToCsv(csvRows) # Determine filename if len(allCsvSections) == 1: filename = f"{baseFilename}.csv" else: sectionId = csvSection.get("id", f"csv_{i+1}") if sectionTitle: safeTitle = "".join(c for c in sectionTitle if c.isalnum() or c in (' ', '-', '_')).strip() safeTitle = safeTitle.replace(' ', '_')[:30] filename = f"{baseFilename}_{safeTitle}.csv" else: filename = f"{baseFilename}_{sectionId}.csv" documentType = metadata.get("documentType") if isinstance(metadata, dict) else None renderedDocuments.append( RenderedDocument( documentData=csvContent.encode('utf-8'), mimeType="text/csv", filename=filename, documentType=documentType, metadata=metadata if isinstance(metadata, dict) else None ) ) return renderedDocuments except Exception as e: self.logger.error(f"Error rendering CSV: {str(e)}") # Return minimal CSV fallback fallbackCsv = self._convertRowsToCsv([["Title", "Content"], [title, f"Error rendering report: {str(e)}"]]) return [ RenderedDocument( documentData=fallbackCsv.encode('utf-8'), mimeType="text/csv", filename=self._determineFilename(title, "text/csv"), metadata=extractedContent.get("metadata", {}) if extractedContent else None ) ] async def _generateCsvFromJson(self, jsonContent: Dict[str, Any], title: str) -> str: """Generate CSV content from structured JSON document. DEPRECATED: Use render() method instead.""" # This method is kept for backward compatibility but is no longer used # The render() method now handles CSV generation directly try: # Validate JSON structure (standardized schema: {metadata: {...}, documents: [{sections: [...]}]}) if not self._validateJsonStructure(jsonContent): raise ValueError("JSON content must follow standardized schema: {metadata: {...}, documents: [{sections: [...]}]}") # Extract sections and metadata from standardized schema sections = self._extractSections(jsonContent) metadata = self._extractMetadata(jsonContent) # Use provided title (which comes from documents[].title) as primary source # Fallback to metadata.title only if title parameter is empty documentTitle = title if title else metadata.get("title", "Generated Document") # Generate CSV content csvRows = [] # Add title row if documentTitle: csvRows.append([documentTitle]) csvRows.append([]) # Empty row # Process each section in order - only table sections for section in sections: sectionType = section.get("content_type", "paragraph") if sectionType == "table": sectionCsv = self._renderJsonSectionToCsv(section) if sectionCsv: csvRows.extend(sectionCsv) csvRows.append([]) # Empty row between sections # Convert to CSV string csvContent = self._convertRowsToCsv(csvRows) return csvContent except Exception as e: self.logger.error(f"Error generating CSV from JSON: {str(e)}") raise Exception(f"CSV generation failed: {str(e)}") def _renderJsonSectionToCsv(self, section: Dict[str, Any]) -> List[List[str]]: """Render a single JSON section to CSV rows.""" try: sectionType = section.get("content_type", "paragraph") elements = section.get("elements", []) csvRows = [] # Add section title if available sectionTitle = section.get("title") if sectionTitle: csvRows.append([f"# {sectionTitle}"]) # Process each element in the section for element in elements: if sectionType == "table": csvRows.extend(self._renderJsonTableToCsv(element)) elif sectionType == "list": csvRows.extend(self._renderJsonListToCsv(element)) elif sectionType == "heading": csvRows.extend(self._renderJsonHeadingToCsv(element)) elif sectionType == "paragraph": csvRows.extend(self._renderJsonParagraphToCsv(element)) elif sectionType == "code": csvRows.extend(self._renderJsonCodeToCsv(element)) else: # Fallback to paragraph for unknown types csvRows.extend(self._renderJsonParagraphToCsv(element)) return csvRows except Exception as e: self.logger.warning(f"Error rendering section {section.get('id', 'unknown')}: {str(e)}") return [["[Error rendering section]"]] def _renderJsonTableToCsv(self, tableData: Dict[str, Any]) -> List[List[str]]: """Render a JSON table to CSV rows.""" try: # Extract from nested content structure content = tableData.get("content", {}) if not isinstance(content, dict): return [] headers = content.get("headers", []) rows = content.get("rows", []) csvRows = [] if headers: csvRows.append(headers) if rows: csvRows.extend(rows) return csvRows except Exception as e: self.logger.warning(f"Error rendering table: {str(e)}") return [["[Error rendering table]"]] def _renderJsonListToCsv(self, listData: Dict[str, Any]) -> List[List[str]]: """Render a JSON list to CSV rows.""" try: # Extract from nested content structure content = listData.get("content", {}) if not isinstance(content, dict): return [] items = content.get("items", []) csvRows = [] for item in items: if isinstance(item, dict): text = item.get("text", "") subitems = item.get("subitems", []) csvRows.append([text]) # Add subitems as indented rows for subitem in subitems: if isinstance(subitem, dict): csvRows.append([f" - {subitem.get('text', '')}"]) else: csvRows.append([f" - {subitem}"]) else: csvRows.append([str(item)]) return csvRows except Exception as e: self.logger.warning(f"Error rendering list: {str(e)}") return [["[Error rendering list]"]] def _renderJsonHeadingToCsv(self, headingData: Dict[str, Any]) -> List[List[str]]: """Render a JSON heading to CSV rows.""" try: # Extract from nested content structure content = headingData.get("content", {}) if not isinstance(content, dict): return [] text = content.get("text", "") level = content.get("level", 1) if text: # Use # symbols for heading levels headingText = f"{'#' * level} {text}" return [[headingText]] return [] except Exception as e: self.logger.warning(f"Error rendering heading: {str(e)}") return [["[Error rendering heading]"]] def _renderJsonParagraphToCsv(self, paragraphData: Dict[str, Any]) -> List[List[str]]: """Render a JSON paragraph to CSV rows.""" try: # Extract from nested content structure content = paragraphData.get("content", {}) if isinstance(content, dict): text = content.get("text", "") elif isinstance(content, str): text = content else: text = "" if text: # Split long paragraphs into multiple rows if needed if len(text) > 100: words = text.split() rows = [] currentRow = [] currentLength = 0 for word in words: if currentLength + len(word) > 100 and currentRow: rows.append([" ".join(currentRow)]) currentRow = [word] currentLength = len(word) else: currentRow.append(word) currentLength += len(word) + 1 if currentRow: rows.append([" ".join(currentRow)]) return rows else: return [[text]] return [] except Exception as e: self.logger.warning(f"Error rendering paragraph: {str(e)}") return [["[Error rendering paragraph]"]] def _renderJsonCodeToCsv(self, codeData: Dict[str, Any]) -> List[List[str]]: """Render a JSON code block to CSV rows.""" try: # Extract from nested content structure content = codeData.get("content", {}) if not isinstance(content, dict): return [] code = content.get("code", "") language = content.get("language", "") csvRows = [] if language: csvRows.append([f"Code ({language}):"]) if code: # Split code into lines codeLines = code.split('\n') for line in codeLines: csvRows.append([f" {line}"]) return csvRows except Exception as e: self.logger.warning(f"Error rendering code block: {str(e)}") return [["[Error rendering code block]"]] def _convertRowsToCsv(self, rows: List[List[str]]) -> str: """Convert rows to CSV string.""" import csv import io output = io.StringIO() writer = csv.writer(output) for row in rows: if row: # Only write non-empty rows writer.writerow(row) return output.getvalue() def _cleanCsvContent(self, content: str, title: str) -> str: """Clean and validate CSV content from AI.""" content = content.strip() # Remove markdown code blocks if present if content.startswith("```") and content.endswith("```"): lines = content.split('\n') if len(lines) > 2: content = '\n'.join(lines[1:-1]).strip() return content