159 lines
5.9 KiB
Python
159 lines
5.9 KiB
Python
# Copyright (c) 2025 Patrick Motsch
|
|
# All rights reserved.
|
|
"""
|
|
CSV code renderer for code generation.
|
|
"""
|
|
|
|
from .codeRendererBaseTemplate import BaseCodeRenderer
|
|
from modules.datamodels.datamodelDocument import RenderedDocument
|
|
from typing import Dict, Any, List, Optional
|
|
import csv
|
|
import io
|
|
|
|
class RendererCodeCsv(BaseCodeRenderer):
|
|
"""Renders CSV code files."""
|
|
|
|
@classmethod
|
|
def getSupportedFormats(cls) -> List[str]:
|
|
"""Return supported CSV formats."""
|
|
return ['csv']
|
|
|
|
@classmethod
|
|
def getFormatAliases(cls) -> List[str]:
|
|
"""Return format aliases."""
|
|
return []
|
|
|
|
@classmethod
|
|
def getPriority(cls) -> int:
|
|
"""Return priority for CSV code renderer."""
|
|
return 75 # Higher than document renderer (70) for code generation
|
|
|
|
@classmethod
|
|
def getOutputStyle(cls, formatName: Optional[str] = None) -> str:
|
|
"""Return output style classification: CSV requires specific structure."""
|
|
return 'code'
|
|
|
|
async def renderCodeFiles(
|
|
self,
|
|
codeFiles: List[Dict[str, Any]],
|
|
metadata: Dict[str, Any],
|
|
userPrompt: str = None
|
|
) -> List[RenderedDocument]:
|
|
"""
|
|
Render CSV code files.
|
|
For single file: output as-is (validate structure)
|
|
For multiple files: output separately (each is independent CSV)
|
|
"""
|
|
renderedDocs = []
|
|
|
|
for codeFile in codeFiles:
|
|
if not self._validateCodeFile(codeFile):
|
|
self.logger.warning(f"Invalid code file: {codeFile.get('filename', 'unknown')}")
|
|
continue
|
|
|
|
filename = codeFile['filename']
|
|
content = codeFile['content']
|
|
|
|
# Validate CSV structure (header row, consistent columns)
|
|
validatedContent = self._validateAndFixCsv(content)
|
|
|
|
# Extract CSV statistics for validation
|
|
csvStats = self._extractCsvStatistics(validatedContent)
|
|
|
|
# Merge file-specific metadata with project metadata
|
|
fileMetadata = dict(metadata) if metadata else {}
|
|
fileMetadata.update({
|
|
"filename": filename,
|
|
"fileType": "csv",
|
|
"statistics": csvStats
|
|
})
|
|
|
|
renderedDocs.append(
|
|
RenderedDocument(
|
|
documentData=validatedContent.encode('utf-8'),
|
|
mimeType="text/csv",
|
|
filename=filename,
|
|
metadata=fileMetadata
|
|
)
|
|
)
|
|
|
|
return renderedDocs
|
|
|
|
async def render(self, extractedContent: Dict[str, Any], title: str, userPrompt: str = None, aiService=None, *, style: Dict[str, Any] = None) -> List[RenderedDocument]:
|
|
"""
|
|
Render method for document generation compatibility.
|
|
Delegates to document renderer if needed, or handles code files directly.
|
|
"""
|
|
# Check if this is code generation (has files array) or document generation (has documents array)
|
|
if "files" in extractedContent:
|
|
# Code generation path - use renderCodeFiles
|
|
files = extractedContent.get("files", [])
|
|
metadata = extractedContent.get("metadata", {})
|
|
return await self.renderCodeFiles(files, metadata, userPrompt)
|
|
else:
|
|
# Document generation path - delegate to document renderer
|
|
from .rendererCsv import RendererCsv
|
|
documentRenderer = RendererCsv(self.services)
|
|
return await documentRenderer.render(extractedContent, title, userPrompt, aiService)
|
|
|
|
def _validateAndFixCsv(self, content: str) -> str:
|
|
"""Validate CSV structure and fix common issues."""
|
|
try:
|
|
# Parse CSV to validate structure
|
|
reader = csv.reader(io.StringIO(content))
|
|
rows = list(reader)
|
|
|
|
if not rows:
|
|
return content # Empty CSV
|
|
|
|
# Check header row exists
|
|
headerRow = rows[0]
|
|
headerCount = len(headerRow)
|
|
|
|
# Validate all rows have same column count
|
|
fixedRows = [headerRow] # Start with header
|
|
|
|
for i, row in enumerate(rows[1:], 1):
|
|
if len(row) != headerCount:
|
|
self.logger.debug(f"Row {i} has {len(row)} columns, expected {headerCount}. Auto-fixing...")
|
|
# Pad or truncate to match header
|
|
if len(row) < headerCount:
|
|
row.extend([''] * (headerCount - len(row)))
|
|
else:
|
|
row = row[:headerCount]
|
|
fixedRows.append(row)
|
|
|
|
# Convert back to CSV string
|
|
output = io.StringIO()
|
|
writer = csv.writer(output)
|
|
for row in fixedRows:
|
|
writer.writerow(row)
|
|
|
|
return output.getvalue()
|
|
|
|
except Exception as e:
|
|
self.logger.warning(f"CSV validation failed: {e}, returning original content")
|
|
return content
|
|
|
|
def _extractCsvStatistics(self, content: str) -> Dict[str, Any]:
|
|
"""Extract CSV statistics for validation (row count, column count, headers)."""
|
|
try:
|
|
reader = csv.reader(io.StringIO(content))
|
|
rows = list(reader)
|
|
|
|
if not rows:
|
|
return {"rowCount": 0, "columnCount": 0, "headerRow": []}
|
|
|
|
headerRow = rows[0]
|
|
columnCount = len(headerRow)
|
|
rowCount = len(rows) - 1 # Exclude header
|
|
|
|
return {
|
|
"rowCount": rowCount,
|
|
"columnCount": columnCount,
|
|
"headerRow": headerRow,
|
|
"dataRowCount": rowCount
|
|
}
|
|
except Exception as e:
|
|
self.logger.warning(f"CSV statistics extraction failed: {e}")
|
|
return {}
|