415 lines
17 KiB
Python
415 lines
17 KiB
Python
# Copyright (c) 2025 Patrick Motsch
|
|
# All rights reserved.
|
|
"""
|
|
CSV renderer for report generation.
|
|
"""
|
|
|
|
from .documentRendererBaseTemplate import BaseRenderer
|
|
from modules.datamodels.datamodelDocument import RenderedDocument
|
|
from typing import Dict, Any, List, Optional
|
|
|
|
class RendererCsv(BaseRenderer):
|
|
"""Renders content to CSV format with format-specific extraction."""
|
|
|
|
@classmethod
|
|
def getSupportedFormats(cls) -> List[str]:
|
|
"""Return supported CSV formats."""
|
|
return ['csv']
|
|
|
|
@classmethod
|
|
def getFormatAliases(cls) -> List[str]:
|
|
"""Return format aliases."""
|
|
return ['spreadsheet', 'table']
|
|
|
|
@classmethod
|
|
def getPriority(cls) -> int:
|
|
"""Return priority for CSV renderer."""
|
|
return 70
|
|
|
|
@classmethod
|
|
def getOutputStyle(cls, formatName: Optional[str] = None) -> str:
|
|
"""Return output style classification: CSV document renderer converts structured document content to CSV."""
|
|
return 'document'
|
|
|
|
@classmethod
|
|
def getAcceptedSectionTypes(cls, formatName: Optional[str] = None) -> List[str]:
|
|
"""
|
|
Return list of section content types that CSV renderer accepts.
|
|
CSV renderer accepts table sections and code_block sections (for raw CSV content).
|
|
"""
|
|
return ["table", "code_block"]
|
|
|
|
async def render(self, extractedContent: Dict[str, Any], title: str, userPrompt: str = None, aiService=None) -> List[RenderedDocument]:
|
|
"""Render extracted JSON content to CSV format. Produces one CSV file per table section."""
|
|
try:
|
|
# Validate JSON structure
|
|
if not self._validateJsonStructure(extractedContent):
|
|
raise ValueError("JSON content must follow standardized schema: {metadata: {...}, documents: [{sections: [...]}]}")
|
|
|
|
# Extract sections and metadata
|
|
sections = self._extractSections(extractedContent)
|
|
metadata = self._extractMetadata(extractedContent)
|
|
|
|
# Determine base filename from document or title
|
|
documents = extractedContent.get("documents", [])
|
|
baseFilename = None
|
|
if documents and isinstance(documents[0], dict):
|
|
baseFilename = documents[0].get("filename")
|
|
if not baseFilename:
|
|
baseFilename = self._determineFilename(title, "text/csv")
|
|
|
|
# Remove extension from base filename if present
|
|
if baseFilename.endswith('.csv'):
|
|
baseFilename = baseFilename[:-4]
|
|
|
|
# Collect CSV-producing sections: table sections AND code_block sections with CSV language
|
|
tableSections = []
|
|
codeBlockCsvSections = []
|
|
for section in sections:
|
|
sectionType = section.get("content_type", "paragraph")
|
|
if sectionType == "table":
|
|
tableSections.append(section)
|
|
elif sectionType == "code_block":
|
|
# Check if any element is a code_block with language "csv"
|
|
for element in section.get("elements", []):
|
|
content = element.get("content", {})
|
|
if isinstance(content, dict) and content.get("language", "").lower() == "csv":
|
|
codeBlockCsvSections.append(section)
|
|
break
|
|
|
|
# If no usable sections found, return empty CSV
|
|
if not tableSections and not codeBlockCsvSections:
|
|
self.logger.warning("No table or CSV code_block sections found in CSV document - returning empty CSV")
|
|
emptyCsv = self._convertRowsToCsv([["No table data available"]])
|
|
return [
|
|
RenderedDocument(
|
|
documentData=emptyCsv.encode('utf-8'),
|
|
mimeType="text/csv",
|
|
filename=self._determineFilename(title, "text/csv"),
|
|
documentType=metadata.get("documentType") if isinstance(metadata, dict) else None,
|
|
metadata=metadata if isinstance(metadata, dict) else None
|
|
)
|
|
]
|
|
|
|
allCsvSections = tableSections + codeBlockCsvSections
|
|
|
|
# Generate one CSV file per section
|
|
renderedDocuments = []
|
|
for i, csvSection in enumerate(allCsvSections):
|
|
sectionType = csvSection.get("content_type", "paragraph")
|
|
sectionTitle = csvSection.get("title")
|
|
csvContent = ""
|
|
|
|
if sectionType == "code_block":
|
|
# Extract raw CSV content directly from code_block elements
|
|
rawCsvParts = []
|
|
for element in csvSection.get("elements", []):
|
|
content = element.get("content", {})
|
|
if isinstance(content, dict) and content.get("language", "").lower() == "csv":
|
|
code = content.get("code", "")
|
|
if code:
|
|
rawCsvParts.append(code)
|
|
csvContent = "\n".join(rawCsvParts)
|
|
else:
|
|
# Table section — render via table logic
|
|
csvRows = []
|
|
if sectionTitle:
|
|
csvRows.append([sectionTitle])
|
|
csvRows.append([]) # Empty row after title
|
|
|
|
elements = csvSection.get("elements", [])
|
|
for element in elements:
|
|
tableRows = self._renderJsonTableToCsv(element)
|
|
if tableRows:
|
|
csvRows.extend(tableRows)
|
|
|
|
csvContent = self._convertRowsToCsv(csvRows)
|
|
|
|
# Determine filename
|
|
if len(allCsvSections) == 1:
|
|
filename = f"{baseFilename}.csv"
|
|
else:
|
|
sectionId = csvSection.get("id", f"csv_{i+1}")
|
|
if sectionTitle:
|
|
safeTitle = "".join(c for c in sectionTitle if c.isalnum() or c in (' ', '-', '_')).strip()
|
|
safeTitle = safeTitle.replace(' ', '_')[:30]
|
|
filename = f"{baseFilename}_{safeTitle}.csv"
|
|
else:
|
|
filename = f"{baseFilename}_{sectionId}.csv"
|
|
|
|
documentType = metadata.get("documentType") if isinstance(metadata, dict) else None
|
|
|
|
renderedDocuments.append(
|
|
RenderedDocument(
|
|
documentData=csvContent.encode('utf-8'),
|
|
mimeType="text/csv",
|
|
filename=filename,
|
|
documentType=documentType,
|
|
metadata=metadata if isinstance(metadata, dict) else None
|
|
)
|
|
)
|
|
|
|
return renderedDocuments
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error rendering CSV: {str(e)}")
|
|
# Return minimal CSV fallback
|
|
fallbackCsv = self._convertRowsToCsv([["Title", "Content"], [title, f"Error rendering report: {str(e)}"]])
|
|
return [
|
|
RenderedDocument(
|
|
documentData=fallbackCsv.encode('utf-8'),
|
|
mimeType="text/csv",
|
|
filename=self._determineFilename(title, "text/csv"),
|
|
metadata=extractedContent.get("metadata", {}) if extractedContent else None
|
|
)
|
|
]
|
|
|
|
async def _generateCsvFromJson(self, jsonContent: Dict[str, Any], title: str) -> str:
|
|
"""Generate CSV content from structured JSON document. DEPRECATED: Use render() method instead."""
|
|
# This method is kept for backward compatibility but is no longer used
|
|
# The render() method now handles CSV generation directly
|
|
try:
|
|
# Validate JSON structure (standardized schema: {metadata: {...}, documents: [{sections: [...]}]})
|
|
if not self._validateJsonStructure(jsonContent):
|
|
raise ValueError("JSON content must follow standardized schema: {metadata: {...}, documents: [{sections: [...]}]}")
|
|
|
|
# Extract sections and metadata from standardized schema
|
|
sections = self._extractSections(jsonContent)
|
|
metadata = self._extractMetadata(jsonContent)
|
|
|
|
# Use provided title (which comes from documents[].title) as primary source
|
|
# Fallback to metadata.title only if title parameter is empty
|
|
documentTitle = title if title else metadata.get("title", "Generated Document")
|
|
|
|
# Generate CSV content
|
|
csvRows = []
|
|
|
|
# Add title row
|
|
if documentTitle:
|
|
csvRows.append([documentTitle])
|
|
csvRows.append([]) # Empty row
|
|
|
|
# Process each section in order - only table sections
|
|
for section in sections:
|
|
sectionType = section.get("content_type", "paragraph")
|
|
if sectionType == "table":
|
|
sectionCsv = self._renderJsonSectionToCsv(section)
|
|
if sectionCsv:
|
|
csvRows.extend(sectionCsv)
|
|
csvRows.append([]) # Empty row between sections
|
|
|
|
# Convert to CSV string
|
|
csvContent = self._convertRowsToCsv(csvRows)
|
|
|
|
return csvContent
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error generating CSV from JSON: {str(e)}")
|
|
raise Exception(f"CSV generation failed: {str(e)}")
|
|
|
|
def _renderJsonSectionToCsv(self, section: Dict[str, Any]) -> List[List[str]]:
|
|
"""Render a single JSON section to CSV rows."""
|
|
try:
|
|
sectionType = section.get("content_type", "paragraph")
|
|
elements = section.get("elements", [])
|
|
|
|
csvRows = []
|
|
|
|
# Add section title if available
|
|
sectionTitle = section.get("title")
|
|
if sectionTitle:
|
|
csvRows.append([f"# {sectionTitle}"])
|
|
|
|
# Process each element in the section
|
|
for element in elements:
|
|
if sectionType == "table":
|
|
csvRows.extend(self._renderJsonTableToCsv(element))
|
|
elif sectionType == "list":
|
|
csvRows.extend(self._renderJsonListToCsv(element))
|
|
elif sectionType == "heading":
|
|
csvRows.extend(self._renderJsonHeadingToCsv(element))
|
|
elif sectionType == "paragraph":
|
|
csvRows.extend(self._renderJsonParagraphToCsv(element))
|
|
elif sectionType == "code":
|
|
csvRows.extend(self._renderJsonCodeToCsv(element))
|
|
else:
|
|
# Fallback to paragraph for unknown types
|
|
csvRows.extend(self._renderJsonParagraphToCsv(element))
|
|
|
|
return csvRows
|
|
|
|
except Exception as e:
|
|
self.logger.warning(f"Error rendering section {section.get('id', 'unknown')}: {str(e)}")
|
|
return [["[Error rendering section]"]]
|
|
|
|
def _renderJsonTableToCsv(self, tableData: Dict[str, Any]) -> List[List[str]]:
|
|
"""Render a JSON table to CSV rows."""
|
|
try:
|
|
# Extract from nested content structure
|
|
content = tableData.get("content", {})
|
|
if not isinstance(content, dict):
|
|
return []
|
|
headers = content.get("headers", [])
|
|
rows = content.get("rows", [])
|
|
|
|
csvRows = []
|
|
|
|
if headers:
|
|
csvRows.append(headers)
|
|
|
|
if rows:
|
|
csvRows.extend(rows)
|
|
|
|
return csvRows
|
|
|
|
except Exception as e:
|
|
self.logger.warning(f"Error rendering table: {str(e)}")
|
|
return [["[Error rendering table]"]]
|
|
|
|
def _renderJsonListToCsv(self, listData: Dict[str, Any]) -> List[List[str]]:
|
|
"""Render a JSON list to CSV rows."""
|
|
try:
|
|
# Extract from nested content structure
|
|
content = listData.get("content", {})
|
|
if not isinstance(content, dict):
|
|
return []
|
|
items = content.get("items", [])
|
|
csvRows = []
|
|
|
|
for item in items:
|
|
if isinstance(item, dict):
|
|
text = item.get("text", "")
|
|
subitems = item.get("subitems", [])
|
|
csvRows.append([text])
|
|
|
|
# Add subitems as indented rows
|
|
for subitem in subitems:
|
|
if isinstance(subitem, dict):
|
|
csvRows.append([f" - {subitem.get('text', '')}"])
|
|
else:
|
|
csvRows.append([f" - {subitem}"])
|
|
else:
|
|
csvRows.append([str(item)])
|
|
|
|
return csvRows
|
|
|
|
except Exception as e:
|
|
self.logger.warning(f"Error rendering list: {str(e)}")
|
|
return [["[Error rendering list]"]]
|
|
|
|
def _renderJsonHeadingToCsv(self, headingData: Dict[str, Any]) -> List[List[str]]:
|
|
"""Render a JSON heading to CSV rows."""
|
|
try:
|
|
# Extract from nested content structure
|
|
content = headingData.get("content", {})
|
|
if not isinstance(content, dict):
|
|
return []
|
|
text = content.get("text", "")
|
|
level = content.get("level", 1)
|
|
|
|
if text:
|
|
# Use # symbols for heading levels
|
|
headingText = f"{'#' * level} {text}"
|
|
return [[headingText]]
|
|
|
|
return []
|
|
|
|
except Exception as e:
|
|
self.logger.warning(f"Error rendering heading: {str(e)}")
|
|
return [["[Error rendering heading]"]]
|
|
|
|
def _renderJsonParagraphToCsv(self, paragraphData: Dict[str, Any]) -> List[List[str]]:
|
|
"""Render a JSON paragraph to CSV rows."""
|
|
try:
|
|
# Extract from nested content structure
|
|
content = paragraphData.get("content", {})
|
|
if isinstance(content, dict):
|
|
text = content.get("text", "")
|
|
elif isinstance(content, str):
|
|
text = content
|
|
else:
|
|
text = ""
|
|
|
|
if text:
|
|
# Split long paragraphs into multiple rows if needed
|
|
if len(text) > 100:
|
|
words = text.split()
|
|
rows = []
|
|
currentRow = []
|
|
currentLength = 0
|
|
|
|
for word in words:
|
|
if currentLength + len(word) > 100 and currentRow:
|
|
rows.append([" ".join(currentRow)])
|
|
currentRow = [word]
|
|
currentLength = len(word)
|
|
else:
|
|
currentRow.append(word)
|
|
currentLength += len(word) + 1
|
|
|
|
if currentRow:
|
|
rows.append([" ".join(currentRow)])
|
|
|
|
return rows
|
|
else:
|
|
return [[text]]
|
|
|
|
return []
|
|
|
|
except Exception as e:
|
|
self.logger.warning(f"Error rendering paragraph: {str(e)}")
|
|
return [["[Error rendering paragraph]"]]
|
|
|
|
def _renderJsonCodeToCsv(self, codeData: Dict[str, Any]) -> List[List[str]]:
|
|
"""Render a JSON code block to CSV rows."""
|
|
try:
|
|
# Extract from nested content structure
|
|
content = codeData.get("content", {})
|
|
if not isinstance(content, dict):
|
|
return []
|
|
code = content.get("code", "")
|
|
language = content.get("language", "")
|
|
|
|
csvRows = []
|
|
|
|
if language:
|
|
csvRows.append([f"Code ({language}):"])
|
|
|
|
if code:
|
|
# Split code into lines
|
|
codeLines = code.split('\n')
|
|
for line in codeLines:
|
|
csvRows.append([f" {line}"])
|
|
|
|
return csvRows
|
|
|
|
except Exception as e:
|
|
self.logger.warning(f"Error rendering code block: {str(e)}")
|
|
return [["[Error rendering code block]"]]
|
|
|
|
def _convertRowsToCsv(self, rows: List[List[str]]) -> str:
|
|
"""Convert rows to CSV string."""
|
|
import csv
|
|
import io
|
|
|
|
output = io.StringIO()
|
|
writer = csv.writer(output)
|
|
|
|
for row in rows:
|
|
if row: # Only write non-empty rows
|
|
writer.writerow(row)
|
|
|
|
return output.getvalue()
|
|
|
|
def _cleanCsvContent(self, content: str, title: str) -> str:
|
|
"""Clean and validate CSV content from AI."""
|
|
content = content.strip()
|
|
|
|
# Remove markdown code blocks if present
|
|
if content.startswith("```") and content.endswith("```"):
|
|
lines = content.split('\n')
|
|
if len(lines) > 2:
|
|
content = '\n'.join(lines[1:-1]).strip()
|
|
|
|
return content
|
|
|