office renderers updated to json model

This commit is contained in:
ValueOn AG 2025-10-11 23:39:13 +02:00
parent 87dec2c4a2
commit a26553c34c
16 changed files with 3156 additions and 438 deletions

View file

@ -0,0 +1,125 @@
from typing import Any, Dict, List, Optional, Literal, Union
from pydantic import BaseModel, Field
from datetime import datetime
class DocumentMetadata(BaseModel):
"""Metadata for the entire document."""
title: str = Field(description="Document title")
author: Optional[str] = Field(default=None, description="Document author")
created_at: datetime = Field(default_factory=datetime.now, description="Creation timestamp")
source_documents: List[str] = Field(default_factory=list, description="Source document IDs")
extraction_method: str = Field(default="ai_extraction", description="Method used for extraction")
version: str = Field(default="1.0", description="Document version")
class TableData(BaseModel):
"""Structured table data."""
headers: List[str] = Field(description="Table column headers")
rows: List[List[str]] = Field(description="Table data rows")
caption: Optional[str] = Field(default=None, description="Table caption")
metadata: Dict[str, Any] = Field(default_factory=dict, description="Table metadata")
class ListItem(BaseModel):
"""Individual list item with optional sub-items."""
text: str = Field(description="List item text")
subitems: Optional[List['ListItem']] = Field(default=None, description="Nested sub-items")
metadata: Dict[str, Any] = Field(default_factory=dict, description="Item metadata")
class BulletList(BaseModel):
"""Bulleted or numbered list."""
items: List[ListItem] = Field(description="List items")
list_type: Literal["bullet", "numbered", "checklist"] = Field(default="bullet", description="List type")
metadata: Dict[str, Any] = Field(default_factory=dict, description="List metadata")
class Paragraph(BaseModel):
"""Text paragraph with optional formatting."""
text: str = Field(description="Paragraph text")
formatting: Optional[Dict[str, Any]] = Field(default=None, description="Text formatting (bold, italic, etc.)")
metadata: Dict[str, Any] = Field(default_factory=dict, description="Paragraph metadata")
class Heading(BaseModel):
"""Document heading."""
text: str = Field(description="Heading text")
level: int = Field(ge=1, le=6, description="Heading level (1-6)")
metadata: Dict[str, Any] = Field(default_factory=dict, description="Heading metadata")
class CodeBlock(BaseModel):
"""Code block with syntax highlighting."""
code: str = Field(description="Code content")
language: Optional[str] = Field(default=None, description="Programming language")
metadata: Dict[str, Any] = Field(default_factory=dict, description="Code block metadata")
class Image(BaseModel):
"""Image with metadata."""
data: str = Field(description="Base64 encoded image data")
alt_text: Optional[str] = Field(default=None, description="Alternative text")
caption: Optional[str] = Field(default=None, description="Image caption")
metadata: Dict[str, Any] = Field(default_factory=dict, description="Image metadata")
class DocumentSection(BaseModel):
"""A section of the document containing one or more content elements."""
id: str = Field(description="Unique section identifier")
title: Optional[str] = Field(default=None, description="Section title")
content_type: Literal["table", "list", "paragraph", "heading", "code", "image", "mixed"] = Field(description="Primary content type")
elements: List[Union[TableData, BulletList, Paragraph, Heading, CodeBlock, Image]] = Field(description="Content elements in this section")
order: int = Field(description="Section order in document")
metadata: Dict[str, Any] = Field(default_factory=dict, description="Section metadata")
class StructuredDocument(BaseModel):
"""Complete structured document in JSON format."""
metadata: DocumentMetadata = Field(description="Document metadata")
sections: List[DocumentSection] = Field(description="Document sections")
summary: Optional[str] = Field(default=None, description="Document summary")
tags: List[str] = Field(default_factory=list, description="Document tags")
def get_sections_by_type(self, content_type: str) -> List[DocumentSection]:
"""Get all sections of a specific content type."""
return [section for section in self.sections if section.content_type == content_type]
def get_all_tables(self) -> List[TableData]:
"""Get all table data from the document."""
tables = []
for section in self.sections:
for element in section.elements:
if isinstance(element, TableData):
tables.append(element)
return tables
def get_all_lists(self) -> List[BulletList]:
"""Get all lists from the document."""
lists = []
for section in self.sections:
for element in section.elements:
if isinstance(element, BulletList):
lists.append(element)
return lists
class JsonChunkResult(BaseModel):
"""Result from processing a single chunk with JSON output."""
chunk_id: str = Field(description="Chunk identifier")
document_section: DocumentSection = Field(description="Structured content from this chunk")
processing_time: float = Field(description="Processing time in seconds")
metadata: Dict[str, Any] = Field(default_factory=dict, description="Chunk processing metadata")
class JsonMergeResult(BaseModel):
"""Result from merging multiple JSON chunks."""
merged_document: StructuredDocument = Field(description="Merged structured document")
merge_strategy: str = Field(description="Strategy used for merging")
chunks_processed: int = Field(description="Number of chunks processed")
merge_time: float = Field(description="Time taken to merge chunks")
metadata: Dict[str, Any] = Field(default_factory=dict, description="Merge process metadata")
# Update forward references
ListItem.model_rebuild()

View file

@ -347,6 +347,41 @@ class AiObjects:
logger.info(f"Selected by BALANCED: {selected}")
return selected
def _getFallbackModels(self, operationType: str) -> List[str]:
"""Get ordered list of fallback models for a given operation type."""
fallbackMappings = {
OperationType.GENERAL: [
"openai_callAiBasic_gpt35", # Fast and reliable
"openai_callAiBasic", # High quality
"anthropic_callAiBasic", # Alternative high quality
"perplexity_callAiBasic" # Cost effective
],
OperationType.IMAGE_ANALYSIS: [
"openai_callAiImage", # Primary image analysis
"anthropic_callAiImage" # Alternative image analysis
],
OperationType.IMAGE_GENERATION: [
"openai_generateImage" # Only image generation model
],
OperationType.WEB_RESEARCH: [
"perplexity_callAiWithWebSearch", # Primary web research
"perplexity_callAiBasic", # Alternative with web search
"openai_callAiBasic" # Fallback to general model
],
OperationType.GENERATE_PLAN: [
"anthropic_callAiBasic", # Best for planning
"openai_callAiBasic", # High quality alternative
"openai_callAiBasic_gpt35" # Fast fallback
],
OperationType.ANALYSE_CONTENT: [
"anthropic_callAiBasic", # Best for analysis
"openai_callAiBasic", # High quality alternative
"openai_callAiBasic_gpt35" # Fast fallback
]
}
return fallbackMappings.get(operationType, fallbackMappings[OperationType.GENERAL])
def _connectorFor(self, modelName: str):
"""Get the appropriate connector for the model."""
connectorType = aiModels[modelName]["connector"]
@ -362,7 +397,7 @@ class AiObjects:
raise ValueError(f"Unknown connector type: {connectorType}")
async def call(self, request: AiCallRequest) -> AiCallResponse:
"""Call AI model for text generation."""
"""Call AI model for text generation with fallback mechanism."""
prompt = request.prompt
context = request.context or ""
options = request.options
@ -379,9 +414,6 @@ class AiObjects:
if options.compressContext and len(context.encode("utf-8")) > 70000:
context = maybeTruncate(context, 70000)
# Select model for text generation
modelName = self._selectModel(prompt, context, options)
# Derive generation parameters
temperature = getattr(options, "temperature", None)
if temperature is None:
@ -398,58 +430,112 @@ class AiObjects:
messages.append({"role": "system", "content": f"Context from documents:\n{context}"})
messages.append({"role": "user", "content": prompt})
connector = self._connectorFor(modelName)
functionName = aiModels[modelName]["function"]
# Get fallback models for this operation type
fallbackModels = self._getFallbackModels(options.operationType)
# Call the appropriate function
if functionName == "callAiBasic":
if aiModels[modelName]["connector"] == "openai":
content = await connector.callAiBasic(messages, temperature=temperature, maxTokens=maxTokens)
elif aiModels[modelName]["connector"] == "perplexity":
content = await connector.callAiBasic(messages, temperature=temperature, maxTokens=maxTokens)
else:
response = await connector.callAiBasic(messages, temperature=temperature, maxTokens=maxTokens)
content = response["choices"][0]["message"]["content"]
elif functionName == "callAiWithWebSearch":
# Perplexity web search function
query = prompt
if context:
query = f"Context: {context}\n\nQuery: {prompt}"
content = await connector.callAiWithWebSearch(query)
elif functionName == "researchTopic":
# Perplexity research function
content = await connector.researchTopic(prompt)
elif functionName == "answerQuestion":
# Perplexity question answering function
content = await connector.answerQuestion(prompt, context)
elif functionName == "getCurrentNews":
# Perplexity news function
content = await connector.getCurrentNews(prompt)
else:
raise ValueError(f"Function {functionName} not supported for text generation")
# Try primary model first, then fallbacks
lastError = None
for attempt, modelName in enumerate(fallbackModels):
try:
logger.info(f"Attempting AI call with model: {modelName} (attempt {attempt + 1}/{len(fallbackModels)})")
connector = self._connectorFor(modelName)
functionName = aiModels[modelName]["function"]
# Call the appropriate function
if functionName == "callAiBasic":
if aiModels[modelName]["connector"] == "openai":
content = await connector.callAiBasic(messages, temperature=temperature, maxTokens=maxTokens)
elif aiModels[modelName]["connector"] == "perplexity":
content = await connector.callAiBasic(messages, temperature=temperature, maxTokens=maxTokens)
else:
response = await connector.callAiBasic(messages, temperature=temperature, maxTokens=maxTokens)
content = response["choices"][0]["message"]["content"]
elif functionName == "callAiWithWebSearch":
# Perplexity web search function
query = prompt
if context:
query = f"Context: {context}\n\nQuery: {prompt}"
content = await connector.callAiWithWebSearch(query)
elif functionName == "researchTopic":
# Perplexity research function
content = await connector.researchTopic(prompt)
elif functionName == "answerQuestion":
# Perplexity question answering function
content = await connector.answerQuestion(prompt, context)
elif functionName == "getCurrentNews":
# Perplexity news function
content = await connector.getCurrentNews(prompt)
else:
raise ValueError(f"Function {functionName} not supported for text generation")
# Estimate cost/tokens
totalSize = len((prompt + context).encode("utf-8"))
cost = self._estimateCost(aiModels[modelName], totalSize)
usedTokens = int(totalSize / 4)
# Success! Estimate cost/tokens and return
totalSize = len((prompt + context).encode("utf-8"))
cost = self._estimateCost(aiModels[modelName], totalSize)
usedTokens = int(totalSize / 4)
logger.info(f"✅ AI call successful with model: {modelName}")
return AiCallResponse(content=content, modelName=modelName, usedTokens=usedTokens, costEstimate=cost)
except Exception as e:
lastError = e
logger.warning(f"❌ AI call failed with model {modelName}: {str(e)}")
# If this is not the last model, try the next one
if attempt < len(fallbackModels) - 1:
logger.info(f"🔄 Trying next fallback model...")
continue
else:
# All models failed
logger.error(f"💥 All {len(fallbackModels)} models failed for operation {options.operationType}")
break
return AiCallResponse(content=content, modelName=modelName, usedTokens=usedTokens, costEstimate=cost)
# All fallback attempts failed
errorMsg = f"All AI models failed for operation {options.operationType}. Last error: {str(lastError)}"
logger.error(errorMsg)
raise Exception(errorMsg)
async def callImage(self, prompt: str, imageData: Union[str, bytes], mimeType: str = None, options: AiCallOptions = None) -> str:
"""Call AI model for image analysis."""
"""Call AI model for image analysis with fallback mechanism."""
if options is None:
options = AiCallOptions(operationType=OperationType.IMAGE_ANALYSIS)
# Select model for image analysis
modelName = self._selectModel(prompt, "", options)
# Get fallback models for image analysis
fallbackModels = self._getFallbackModels(OperationType.IMAGE_ANALYSIS)
connector = self._connectorFor(modelName)
functionName = aiModels[modelName]["function"]
if functionName == "callAiImage":
return await connector.callAiImage(prompt, imageData, mimeType)
else:
raise ValueError(f"Function {functionName} not supported for image analysis")
# Try primary model first, then fallbacks
lastError = None
for attempt, modelName in enumerate(fallbackModels):
try:
logger.info(f"Attempting image analysis with model: {modelName} (attempt {attempt + 1}/{len(fallbackModels)})")
connector = self._connectorFor(modelName)
functionName = aiModels[modelName]["function"]
if functionName == "callAiImage":
content = await connector.callAiImage(prompt, imageData, mimeType)
logger.info(f"✅ Image analysis successful with model: {modelName}")
return content
else:
raise ValueError(f"Function {functionName} not supported for image analysis")
except Exception as e:
lastError = e
logger.warning(f"❌ Image analysis failed with model {modelName}: {str(e)}")
# If this is not the last model, try the next one
if attempt < len(fallbackModels) - 1:
logger.info(f"🔄 Trying next fallback model for image analysis...")
continue
else:
# All models failed
logger.error(f"💥 All {len(fallbackModels)} models failed for image analysis")
break
# All fallback attempts failed
errorMsg = f"All AI models failed for image analysis. Last error: {str(lastError)}"
logger.error(errorMsg)
raise Exception(errorMsg)
async def generateImage(self, prompt: str, size: str = "1024x1024", quality: str = "standard", style: str = "vivid", options: AiCallOptions = None) -> Dict[str, Any]:
"""Generate an image using AI."""

View file

@ -63,11 +63,41 @@ class AiService:
logger.info("AiObjects.create() completed")
return instance
def _createImageAnalysisPrompt(self, userPrompt: str) -> str:
def _createImageAnalysisPrompt(self, userPrompt: str, generate_json: bool = False) -> str:
"""Create a safe image analysis prompt based on the user's original request."""
# Escape the user prompt to prevent string formatting issues
safeUserPrompt = userPrompt.replace("'", "\\'").replace('"', '\\"').replace('\n', ' ').replace('\r', ' ')
return f"Based on the user's request: '{safeUserPrompt}', \n\nAnalyze this image completely. Provide a detailed description of everything you see. Include all text, logos, colors, fonts, layout, objects, and visual elements. This is a document processing system that requires complete image analysis for all content types including logos, graphics, simple images, flags, and animations. You must analyze and describe everything visible in the image without any disclaimers or refusals."
if generate_json:
return f"""Based on the user's request: '{safeUserPrompt}', analyze this image and extract all content as structured JSON.
You must respond with valid JSON only in this exact format:
{{
"metadata": {{
"title": "Image Content",
"version": "1.0"
}},
"sections": [
{{
"id": "image_content",
"type": "paragraph",
"data": {{
"text": "Detailed description of everything visible in the image"
}}
}}
]
}}
Analyze the image completely and describe everything you see including:
- All text content
- Tables or structured data
- Logos, graphics, and visual elements
- Layout and formatting
- Any other relevant information
Return only the JSON structure with actual content from the image. Do not include any text before or after the JSON."""
else:
return f"Based on the user's request: '{safeUserPrompt}', \n\nAnalyze this image completely. Provide a detailed description of everything you see. Include all text, logos, colors, fonts, layout, objects, and visual elements. This is a document processing system that requires complete image analysis for all content types including logos, graphics, simple images, flags, and animations. You must analyze and describe everything visible in the image without any disclaimers or refusals."
# AI Image Analysis
async def readImage(
@ -553,26 +583,18 @@ class AiService:
logger.error(f"Error in per-chunk processing: {str(e)}")
return f"[Error in per-chunk processing: {str(e)}]"
async def _processDocumentsPerChunkClean(
async def _processDocumentsPerChunkJson(
self,
documents: List[ChatDocument],
prompt: str,
options: Optional[AiCallOptions] = None
) -> str:
) -> Dict[str, Any]:
"""
Process documents with per-chunk AI calls and merge results in CLEAN mode.
This version excludes debug metadata and document headers for document generation.
Args:
documents: List of ChatDocument objects to process
prompt: AI prompt for processing
options: AI call options
Returns:
Clean merged AI results as string without debug metadata
Process documents with per-chunk AI calls and merge results in JSON mode.
Returns structured JSON document instead of text.
"""
if not documents:
return ""
return {"metadata": {"title": "Empty Document"}, "sections": []}
# Get model capabilities for size calculation
model_capabilities = self._getModelCapabilitiesForContent(prompt, documents, options)
@ -598,32 +620,33 @@ class AiService:
},
}
logger.debug(f"Per-chunk extraction options (clean mode): {extractionOptions}")
logger.debug(f"Per-chunk extraction options (JSON mode): {extractionOptions}")
try:
# Extract content with chunking
extractionResult = self.extractionService.extractContent(documents, extractionOptions)
if not isinstance(extractionResult, list):
return "[Error: No extraction results]"
return {"metadata": {"title": "Error Document"}, "sections": []}
# Process chunks with proper mapping
chunkResults = await self._processChunksWithMapping(extractionResult, prompt, options)
chunkResults = await self._processChunksWithMapping(extractionResult, prompt, options, generate_json=True)
# Merge with CLEAN mode (no debug metadata)
mergedContent = self._mergeChunkResultsClean(chunkResults, options)
# Merge with JSON mode
mergedJsonDocument = self._mergeChunkResultsJson(chunkResults, options)
return mergedContent
return mergedJsonDocument
except Exception as e:
logger.error(f"Error in per-chunk processing (clean mode): {str(e)}")
return f"[Error in per-chunk processing: {str(e)}]"
logger.error(f"Error in per-chunk processing (JSON mode): {str(e)}")
return {"metadata": {"title": "Error Document"}, "sections": []}
async def _processChunksWithMapping(
self,
extractionResult: List[ContentExtracted],
prompt: str,
options: Optional[AiCallOptions] = None
options: Optional[AiCallOptions] = None,
generate_json: bool = False
) -> List[ChunkResult]:
"""Process chunks with proper mapping to preserve relationships."""
from modules.datamodels.datamodelExtraction import ChunkResult
@ -676,24 +699,107 @@ class AiService:
logger.info(f"Chunk {chunk_index}: document_mime_type={document_mime_type}, part.mimeType={part.mimeType}, part.typeGroup={part.typeGroup}, is_image={is_image}")
if is_image:
# Create image analysis prompt based on user's original intent
imagePrompt = self._createImageAnalysisPrompt(prompt)
# Use the same extraction prompt for image analysis (contains table JSON format)
ai_result = await self.readImage(
prompt=imagePrompt,
prompt=prompt,
imageData=part.data,
mimeType=part.mimeType,
options=options
)
# If generating JSON, clean image analysis result
if generate_json:
try:
import json
import re
# Clean the response - remove markdown code blocks if present
cleaned_result = ai_result.strip()
if cleaned_result.startswith('```json'):
# Remove ```json from start and ``` from end
cleaned_result = re.sub(r'^```json\s*', '', cleaned_result)
cleaned_result = re.sub(r'\s*```$', '', cleaned_result)
elif cleaned_result.startswith('```'):
# Remove ``` from start and end
cleaned_result = re.sub(r'^```\s*', '', cleaned_result)
cleaned_result = re.sub(r'\s*```$', '', cleaned_result)
# Validate JSON
json.loads(cleaned_result)
ai_result = cleaned_result # Use cleaned version
except json.JSONDecodeError as e:
logger.warning(f"Image chunk {chunk_index} returned invalid JSON: {str(e)}")
# Create fallback JSON
ai_result = json.dumps({
"metadata": {"title": "Error Section"},
"sections": [{
"id": f"error_section_{chunk_index}",
"type": "paragraph",
"data": {"text": f"Error parsing JSON: {str(e)}"}
}]
})
elif part.typeGroup in ("container", "binary"):
# Handle container and binary content as text (skip processing)
ai_result = f"[Skipped {part.typeGroup} content: {len(part.data)} bytes]"
# Handle ALL container and binary content generically - let AI process any document type
print(f"🔍 DEBUG: Chunk {chunk_index}: typeGroup={part.typeGroup}, mimeType={part.mimeType}, data_length={len(part.data) if part.data else 0}")
if part.mimeType and part.data and len(part.data.strip()) > 0:
# Process any document container as text content
request_options = options if options is not None else AiCallOptions()
request_options.operationType = OperationType.GENERAL
print(f"🔍 Chunk {chunk_index}: Processing {part.mimeType} container as text with generate_json={generate_json}")
logger.info(f"Chunk {chunk_index}: Processing {part.mimeType} container as text with generate_json={generate_json}")
request = AiCallRequest(
prompt=prompt,
context=part.data,
options=request_options
)
response = await self.aiObjects.call(request)
ai_result = response.content
# If generating JSON, validate the response
if generate_json:
try:
import json
import re
# Clean the response - remove markdown code blocks if present
cleaned_result = ai_result.strip()
if cleaned_result.startswith('```json'):
# Remove ```json from start and ``` from end
cleaned_result = re.sub(r'^```json\s*', '', cleaned_result)
cleaned_result = re.sub(r'\s*```$', '', cleaned_result)
elif cleaned_result.startswith('```'):
# Remove ``` from start and end
cleaned_result = re.sub(r'^```\s*', '', cleaned_result)
cleaned_result = re.sub(r'\s*```$', '', cleaned_result)
# Validate JSON
json.loads(cleaned_result)
ai_result = cleaned_result # Use cleaned version
except json.JSONDecodeError as e:
logger.warning(f"Container chunk {chunk_index} ({part.mimeType}) returned invalid JSON: {str(e)}")
# Create fallback JSON
ai_result = json.dumps({
"metadata": {"title": "Error Section"},
"sections": [{
"id": f"error_section_{chunk_index}",
"type": "paragraph",
"data": {"text": f"Error parsing JSON: {str(e)}"}
}]
})
else:
# Skip empty or invalid container/binary content - don't create a result
print(f"🔍 DEBUG: Chunk {chunk_index}: Skipping empty container - mimeType={part.mimeType}, data_length={len(part.data) if part.data else 0}")
# Return None to indicate this chunk should be completely skipped
return None
else:
# Ensure options is not None and set correct operation type for text
request_options = options if options is not None else AiCallOptions()
# FIXED: Set operation type to general for text processing
request_options.operationType = OperationType.GENERAL
print(f"🔍 Chunk {chunk_index}: Calling aiObjects.call with operationType={request_options.operationType}")
logger.info(f"Chunk {chunk_index}: Calling aiObjects.call with operationType={request_options.operationType}")
print(f"🔍 Chunk {chunk_index}: Calling aiObjects.call with operationType={request_options.operationType}, generate_json={generate_json}")
logger.info(f"Chunk {chunk_index}: Calling aiObjects.call with operationType={request_options.operationType}, generate_json={generate_json}")
request = AiCallRequest(
prompt=prompt,
context=part.data,
@ -701,6 +807,39 @@ class AiService:
)
response = await self.aiObjects.call(request)
ai_result = response.content
# If generating JSON, validate the response
if generate_json:
try:
import json
import re
# Clean the response - remove markdown code blocks if present
cleaned_result = ai_result.strip()
if cleaned_result.startswith('```json'):
# Remove ```json from start and ``` from end
cleaned_result = re.sub(r'^```json\s*', '', cleaned_result)
cleaned_result = re.sub(r'\s*```$', '', cleaned_result)
elif cleaned_result.startswith('```'):
# Remove ``` from start and end
cleaned_result = re.sub(r'^```\s*', '', cleaned_result)
cleaned_result = re.sub(r'\s*```$', '', cleaned_result)
# Validate JSON
json.loads(cleaned_result)
ai_result = cleaned_result # Use cleaned version
except json.JSONDecodeError as e:
logger.warning(f"Chunk {chunk_index} returned invalid JSON: {str(e)}")
# Create fallback JSON
ai_result = json.dumps({
"metadata": {"title": "Error Section"},
"sections": [{
"id": f"error_section_{chunk_index}",
"type": "paragraph",
"data": {"text": f"Error parsing JSON: {str(e)}"}
}]
})
processing_time = time.time() - start_time
@ -746,6 +885,9 @@ class AiService:
max_concurrent = options.maxParallelChunks
logger.info(f"Processing {len(chunks_to_process)} chunks with max concurrency: {max_concurrent}")
print(f"🔍 DEBUG: Chunks to process: {len(chunks_to_process)}")
for i, chunk_info in enumerate(chunks_to_process):
print(f"🔍 DEBUG: Chunk {i}: typeGroup={chunk_info['part'].typeGroup}, mimeType={chunk_info['part'].mimeType}, data_length={len(chunk_info['part'].data) if chunk_info['part'].data else 0}")
# Create semaphore for concurrency control
semaphore = asyncio.Semaphore(max_concurrent)
@ -756,7 +898,9 @@ class AiService:
# Process all chunks in parallel with concurrency control
tasks = [process_with_semaphore(chunk_info) for chunk_info in chunks_to_process]
print(f"🔍 DEBUG: Created {len(tasks)} tasks for parallel processing")
chunk_results = await asyncio.gather(*tasks, return_exceptions=True)
print(f"🔍 DEBUG: Got {len(chunk_results)} results from parallel processing")
# Handle any exceptions in the gather itself
processed_results = []
@ -772,7 +916,8 @@ class AiService:
processingTime=0.0,
metadata={"success": False, "error": str(result)}
))
else:
elif result is not None:
# Only add non-None results (skip empty containers)
processed_results.append(result)
logger.info(f"Completed processing {len(processed_results)} chunks")
@ -926,8 +1071,111 @@ class AiService:
# Join all documents
final_result = "\n\n".join(merged_documents)
logger.info(f"Merged {len(chunkResults)} chunks from {len(results_by_document)} documents (clean mode)")
return final_result.strip()
def _mergeChunkResultsJson(
self,
chunkResults: List[ChunkResult],
options: Optional[AiCallOptions] = None
) -> Dict[str, Any]:
"""Merge chunk results in JSON mode - returns structured JSON document."""
import json
if not chunkResults:
return {"metadata": {"title": "Empty Document"}, "sections": []}
# Group chunk results by document
results_by_document = {}
for chunk_result in chunkResults:
doc_id = chunk_result.documentId
if doc_id not in results_by_document:
results_by_document[doc_id] = []
results_by_document[doc_id].append(chunk_result)
# Sort chunks within each document by chunk index
for doc_id in results_by_document:
results_by_document[doc_id].sort(key=lambda x: x.chunkIndex)
# Merge JSON results for each document
all_sections = []
document_titles = []
for doc_id, doc_chunks in results_by_document.items():
# Process each chunk's JSON result
for chunk_result in doc_chunks:
chunk_metadata = chunk_result.metadata
if chunk_metadata.get("success", False):
try:
# Parse JSON from AI result
chunk_json = json.loads(chunk_result.aiResult)
# Extract sections from this chunk
if isinstance(chunk_json, dict) and "sections" in chunk_json:
for section in chunk_json["sections"]:
# Add document context to section
section["metadata"] = section.get("metadata", {})
section["metadata"]["source_document"] = doc_id
section["metadata"]["chunk_index"] = chunk_result.chunkIndex
all_sections.append(section)
# Extract document title
if isinstance(chunk_json, dict) and "metadata" in chunk_json:
title = chunk_json["metadata"].get("title", "")
if title and title not in document_titles:
document_titles.append(title)
except json.JSONDecodeError as e:
logger.warning(f"Failed to parse JSON from chunk {chunk_result.chunkIndex}: {str(e)}")
# Create a fallback section for invalid JSON
fallback_section = {
"id": f"error_section_{chunk_result.chunkIndex}",
"title": "Error Section",
"content_type": "paragraph",
"elements": [{
"text": f"Error parsing chunk {chunk_result.chunkIndex}: {str(e)}"
}],
"order": chunk_result.chunkIndex,
"metadata": {
"source_document": doc_id,
"chunk_index": chunk_result.chunkIndex,
"error": str(e)
}
}
all_sections.append(fallback_section)
else:
# Handle error chunks
error_section = {
"id": f"error_section_{chunk_result.chunkIndex}",
"title": "Error Section",
"content_type": "paragraph",
"elements": [{
"text": f"Error in chunk {chunk_result.chunkIndex}: {chunk_metadata.get('error', 'Unknown error')}"
}],
"order": chunk_result.chunkIndex,
"metadata": {
"source_document": doc_id,
"chunk_index": chunk_result.chunkIndex,
"error": chunk_metadata.get('error', 'Unknown error')
}
}
all_sections.append(error_section)
# Sort sections by order
all_sections.sort(key=lambda x: x.get("order", 0))
# Create merged document
merged_document = {
"metadata": {
"title": document_titles[0] if document_titles else "Merged Document",
"source_documents": list(results_by_document.keys()),
"extraction_method": "ai_json_extraction",
"version": "1.0"
},
"sections": all_sections,
"summary": f"Merged document from {len(results_by_document)} source documents",
"tags": ["merged", "ai_generated"]
}
logger.info(f"Merged {len(chunkResults)} chunks from {len(results_by_document)} documents (JSON mode)")
return merged_document
async def _compressContent(self, content: str, targetSize: int, contentType: str) -> str:
if len(content.encode("utf-8")) <= targetSize:
@ -1194,21 +1442,21 @@ class AiService:
# This ensures MIME-type checking, chunk mapping, and parallel processing
return await self._processDocumentsPerChunk(documents, prompt, options)
async def _callAiTextClean(
async def _callAiJson(
self,
prompt: str,
documents: Optional[List[ChatDocument]],
options: AiCallOptions
) -> str:
) -> Dict[str, Any]:
"""
Handle text calls with document processing in CLEAN mode for document generation.
This version excludes debug metadata and document headers from the final output.
Handle AI calls with document processing for JSON output.
Returns structured JSON document instead of text.
"""
# Ensure aiObjects is initialized
await self._ensureAiObjectsInitialized()
# Process documents with clean merging (no debug metadata)
return await self._processDocumentsPerChunkClean(documents, prompt, options)
# Process documents with JSON merging
return await self._processDocumentsPerChunkJson(documents, prompt, options)
@ -1522,35 +1770,31 @@ class AiService:
aiService=self
)
# Process documents with format-specific prompt using CLEAN mode
# This ensures no debug metadata is included in the final output
aiResponse = await self._callAiTextClean(extractionPrompt, documents, options)
# Process documents with format-specific prompt using JSON mode
# This ensures structured JSON output instead of text
aiResponseJson = await self._callAiJson(extractionPrompt, documents, options)
# Parse filename header from AI response if present
# Validate JSON response
if not isinstance(aiResponseJson, dict) or "sections" not in aiResponseJson:
raise Exception("AI response is not valid JSON document structure")
# Generate filename from document metadata
parsedFilename = None
try:
if aiResponse:
firstNewline = aiResponse.find('\n')
headerLine = aiResponse if firstNewline == -1 else aiResponse[:firstNewline]
if headerLine.strip().lower().startswith('filename:'):
parsed = headerLine.split(':', 1)[1].strip()
# basic sanitization
import re
parsed = re.sub(r"[^a-zA-Z0-9._-]", "-", parsed)
parsed = re.sub(r"-+", "-", parsed).strip('-')
if parsed:
parsedFilename = parsed
# remove header line from content for rendering
aiResponse = aiResponse[firstNewline+1:].lstrip('\n') if firstNewline != -1 else ''
if aiResponseJson.get("metadata", {}).get("title"):
title = aiResponseJson["metadata"]["title"]
# Clean title for filename
import re
parsed = re.sub(r"[^a-zA-Z0-9._-]", "-", title)
parsed = re.sub(r"-+", "-", parsed).strip('-')
if parsed:
parsedFilename = f"{parsed}.{outputFormat}"
except Exception:
parsedFilename = None
if not aiResponse or aiResponse.strip() == "":
raise Exception("AI content generation failed")
# Render the content to the specified format
# Render the JSON content to the specified format
renderedContent, mimeType = await generation_service.renderReport(
extractedContent=aiResponse,
extractedContent=aiResponseJson,
outputFormat=outputFormat,
title=title,
userPrompt=prompt,
@ -1569,7 +1813,7 @@ class AiService:
# Return structured result with document information
return {
"success": True,
"content": aiResponse, # Raw AI response
"content": aiResponseJson, # Structured JSON document
"rendered_content": renderedContent, # Formatted content
"mime_type": mimeType,
"filename": filename,

View file

@ -296,12 +296,12 @@ class GenerationService:
'workflowId': 'unknown'
}
async def renderReport(self, extractedContent: str, outputFormat: str, title: str, userPrompt: str = None, aiService=None) -> tuple[str, str]:
async def renderReport(self, extractedContent: Dict[str, Any], outputFormat: str, title: str, userPrompt: str = None, aiService=None) -> tuple[str, str]:
"""
Render extracted content to the specified output format.
Render extracted JSON content to the specified output format.
Args:
extractedContent: Content extracted by AI using format-specific prompt
extractedContent: Structured JSON document from AI extraction
outputFormat: Target format (html, pdf, docx, txt, md, json, csv, xlsx)
title: Report title
userPrompt: User's original prompt for report generation
@ -311,17 +311,25 @@ class GenerationService:
tuple: (rendered_content, mime_type)
"""
try:
# DEBUG: dump renderer input to diagnose JSON+HTML mixtures TODO REMOVE
# Validate JSON input
if not isinstance(extractedContent, dict):
raise ValueError("extractedContent must be a JSON dictionary")
if "sections" not in extractedContent:
raise ValueError("extractedContent must contain 'sections' field")
# DEBUG: dump renderer input to diagnose JSON structure TODO REMOVE
try:
import os
import json
ts = datetime.now(UTC).strftime("%Y%m%d-%H%M%S")
debug_root = "./test-chat/ai"
debug_dir = os.path.join(debug_root, f"render_input_{ts}")
os.makedirs(debug_dir, exist_ok=True)
with open(os.path.join(debug_dir, "meta.txt"), "w", encoding="utf-8") as f:
f.write(f"title: {title}\nformat: {outputFormat}\nlength: {len(extractedContent or '')}\nstarts_with_brace: {str(extractedContent.strip().startswith('{') if extractedContent else False)}\n")
with open(os.path.join(debug_dir, "extracted_content.txt"), "w", encoding="utf-8") as f:
f.write(extractedContent or "")
f.write(f"title: {title}\nformat: {outputFormat}\ncontent_type: {type(extractedContent).__name__}\n")
with open(os.path.join(debug_dir, "extracted_content.json"), "w", encoding="utf-8") as f:
json.dump(extractedContent, f, indent=2, ensure_ascii=False)
except Exception:
pass
@ -334,7 +342,7 @@ class GenerationService:
generationPrompt = userPrompt # Default to user prompt
if aiService and userPrompt:
try:
from .prompt_builder import buildGenerationPrompt
from .subPromptBuilder import buildGenerationPrompt
generationPrompt = await buildGenerationPrompt(
outputFormat=outputFormat,
userPrompt=userPrompt,
@ -345,8 +353,8 @@ class GenerationService:
logger.warning(f"Failed to generate AI-based generation prompt: {str(e)}, using user prompt")
generationPrompt = userPrompt
# Render the content with AI-generated prompt
renderedContent, mimeType = await renderer.render(extractedContent, title, generationPrompt)
# Render the JSON content with AI-generated prompt
renderedContent, mimeType = await renderer.render(extractedContent, title, generationPrompt, aiService)
# DEBUG: dump rendered output
try:
import os
@ -355,11 +363,11 @@ class GenerationService:
except Exception:
pass
logger.info(f"Successfully rendered report to {outputFormat} format: {len(renderedContent)} characters")
logger.info(f"Successfully rendered JSON report to {outputFormat} format: {len(renderedContent)} characters")
return renderedContent, mimeType
except Exception as e:
logger.error(f"Error rendering report to {outputFormat}: {str(e)}")
logger.error(f"Error rendering JSON report to {outputFormat}: {str(e)}")
raise
async def getExtractionPrompt(self, outputFormat: str, userPrompt: str, title: str, aiService=None) -> str:
@ -382,7 +390,7 @@ class GenerationService:
raise ValueError(f"Unsupported output format: {outputFormat}")
# Build centralized prompt with generic rules + format-specific guidelines
from .prompt_builder import buildExtractionPrompt
from .subPromptBuilder import buildExtractionPrompt
extractionPrompt = await buildExtractionPrompt(
outputFormat=outputFormat,
renderer=renderer,

View file

@ -1,164 +0,0 @@
"""
Centralized prompt builder for document generation across formats.
Builds a robust prompt that:
- Accepts any user intent (no fixed structure assumptions)
- Injects format-specific guidelines from the selected renderer
- Adds a common policy section to always use real data from source docs
- Requires the AI to output a filename header that we can parse and use
"""
from typing import Protocol
class _RendererLike(Protocol):
def getExtractionPrompt(self, user_prompt: str, title: str) -> str: # returns only format-specific guidelines
...
async def buildExtractionPrompt(
outputFormat: str,
renderer: _RendererLike,
userPrompt: str,
title: str,
aiService=None
) -> str:
"""
Build the final extraction prompt by combining:
- Parsed extraction intent from user prompt (using AI)
- Generic cross-format instructions (filename header + real-data policy)
- Format-specific guidelines snippet provided by the renderer
The AI must place a single filename header at the very top:
FILENAME: <safe-file-name-with-extension>
followed by a blank line and then ONLY the document content according to the target format.
"""
# Parse user prompt to separate extraction intent from generation format using AI
extractionIntent = await _parseExtractionIntent(userPrompt, outputFormat, aiService)
formatGuidelines = renderer.getExtractionPrompt(userPrompt, title)
# Generic block appears once for every format
genericIntro = f"""
{extractionIntent}
You are generating a document in {outputFormat.upper()} format for the title: "{title}".
Rules:
- The user's intent fully defines the structure. Do not assume a fixed template or headings.
- Work with whatever data is available from the source documents - partial data is better than no data.
- If some information is missing, create the best possible document with what you have available.
- Do not refuse to generate the document due to incomplete data - always proceed with available information.
- The output must strictly follow the target format and be ready for saving without extra wrapping.
- At the VERY TOP output exactly one line with the filename header:
FILENAME: <safe-file-name-with-extension>
- The base name should be short, descriptive, and kebab-case or snake-case without spaces.
- Include the correct extension for the requested format (e.g., .html, .pdf, .docx, .md, .txt, .json, .csv, .xlsx).
- Avoid special characters beyond [a-zA-Z0-9-_].
- After this header, insert a single blank line and then provide ONLY the document content.
Common policy:
- Use the actual data from the source documents to create the content.
- If data is incomplete, work with what you have and create a meaningful document.
- Always generate the document - never refuse due to missing information.
- Extract and use the real data provided in the source documents to create meaningful content.
""".strip()
# Final assembly
finalPrompt = (
genericIntro
+ "\n\nFORMAT-SPECIFIC GUIDELINES:\n"
+ formatGuidelines.strip()
+ "\n\nGenerate the complete document content now based on the source documents below:"
)
return finalPrompt
async def buildGenerationPrompt(
outputFormat: str,
userPrompt: str,
title: str,
aiService=None
) -> str:
"""
Use AI to build the generation prompt based on user intent and format requirements.
Focus on what's important for the user and how to structure the content.
"""
if not aiService:
# Fallback if no AI service available
return f"Generate a comprehensive {outputFormat} document titled '{title}' based on the extracted content."
try:
# Protect userPrompt from injection
safeUserPrompt = userPrompt.replace('"', '\\"').replace("'", "\\'").replace('\n', ' ').replace('\r', ' ')
# AI call to generate the appropriate generation prompt
generationPromptRequest = f"""
Based on this user request, create a detailed generation prompt for creating a {outputFormat} document.
User request: "{safeUserPrompt}"
Document title: "{title}"
Output format: {outputFormat}
Create a generation prompt that:
1. Identifies what content is most important for the user
2. Specifies how to structure and organize the content. Support with your inputs fo rstructure to match best the user's intention.
3. Includes any specific formatting or presentation requirements
4. Ensures the document meets the user's needs
Return only the generation prompt, starting with "Generate a {outputFormat} document that..."
"""
# Call AI service to generate the prompt
result = await aiService.callAi(
prompt=generationPromptRequest,
documents=None,
options=None
)
return result if result else f"Generate a comprehensive {outputFormat} document titled '{title}' based on the extracted content."
except Exception:
# Fallback on any error
return f"Generate a comprehensive {outputFormat} document titled '{title}' based on the extracted content."
async def _parseExtractionIntent(userPrompt: str, outputFormat: str, aiService=None) -> str:
"""
Use AI to extract the core content intention from the user prompt.
Focus on WHAT the user wants to extract, not HOW to format it.
"""
if not aiService:
# Fallback if no AI service available
return "Extract all relevant content from the document according to the user's requirements"
try:
# Protect userPrompt from injection by escaping quotes and newlines
safeUserPrompt = userPrompt.replace('"', '\\"').replace("'", "\\'").replace('\n', ' ').replace('\r', ' ')
# Simple AI call to extract the intention
extractionPrompt = f"""
Extract the core content intention from this user request. Focus on WHAT content they want.
User request: "{safeUserPrompt}"
Return only the content intention in a simple format like "Extract: [content description]"
Do not include formatting instructions, file types, or output methods.
"""
# Call AI service to extract intention
result = await aiService.callAi(
prompt=extractionPrompt,
documents=None,
options=None
)
return result if result else "Extract all relevant content from the document according to the user's requirements"
except Exception:
# Fallback on any error
return "Extract all relevant content from the document according to the user's requirements"

View file

@ -29,20 +29,21 @@ class CsvRenderer(BaseRenderer):
"""Return only CSV-specific guidelines; global prompt is built centrally."""
return (
"CSV FORMAT GUIDELINES:\n"
"- Emit ONLY CSV text without fences or commentary.\n"
"- Include a single header row with clear column names.\n"
"- Quote fields containing commas, quotes, or newlines; escape quotes by doubling them.\n"
"- Use rows to represent items/records derived from sources.\n"
"- Keep cells concise; include units in headers when useful.\n"
"OUTPUT: Return ONLY valid CSV content that can be imported."
"- Extract structured data from source documents into JSON format\n"
"- Focus on tabular data, lists, and structured information\n"
"- For tables: Extract headers and rows as separate arrays\n"
"- For lists: Extract items with optional sub-items\n"
"- Structure content into sections with clear content types\n"
"- Use proper JSON structure with metadata, sections, and elements\n"
"- Ensure data is clean and ready for CSV conversion\n"
"OUTPUT: Return structured JSON that can be converted to CSV format."
)
async def render(self, extracted_content: str, title: str) -> Tuple[str, str]:
"""Render extracted content to CSV format."""
async def render(self, extracted_content: Dict[str, Any], title: str, user_prompt: str = None, ai_service=None) -> Tuple[str, str]:
"""Render extracted JSON content to CSV format."""
try:
# The extracted content should already be CSV from the AI
# Just clean it up
csv_content = self._clean_csv_content(extracted_content, title)
# Generate CSV directly from JSON (no styling needed for CSV)
csv_content = await self._generate_csv_from_json(extracted_content, title)
return csv_content, "text/csv"
@ -51,6 +52,215 @@ class CsvRenderer(BaseRenderer):
# Return minimal CSV fallback
return f"Title,Content\n{title},Error rendering report: {str(e)}", "text/csv"
async def _generate_csv_from_json(self, json_content: Dict[str, Any], title: str) -> str:
"""Generate CSV content from structured JSON document."""
try:
# Validate JSON structure
if not isinstance(json_content, dict):
raise ValueError("JSON content must be a dictionary")
if "sections" not in json_content:
raise ValueError("JSON content must contain 'sections' field")
# Use title from JSON metadata if available, otherwise use provided title
document_title = json_content.get("metadata", {}).get("title", title)
# Generate CSV content
csv_rows = []
# Add title row
if document_title:
csv_rows.append([document_title])
csv_rows.append([]) # Empty row
# Process each section in order
sections = json_content.get("sections", [])
for section in sections:
section_csv = self._render_json_section_to_csv(section)
if section_csv:
csv_rows.extend(section_csv)
csv_rows.append([]) # Empty row between sections
# Convert to CSV string
csv_content = self._convert_rows_to_csv(csv_rows)
return csv_content
except Exception as e:
self.logger.error(f"Error generating CSV from JSON: {str(e)}")
raise Exception(f"CSV generation failed: {str(e)}")
def _render_json_section_to_csv(self, section: Dict[str, Any]) -> List[List[str]]:
"""Render a single JSON section to CSV rows."""
try:
section_type = section.get("content_type", "paragraph")
elements = section.get("elements", [])
csv_rows = []
# Add section title if available
section_title = section.get("title")
if section_title:
csv_rows.append([f"# {section_title}"])
# Process each element in the section
for element in elements:
if section_type == "table":
csv_rows.extend(self._render_json_table_to_csv(element))
elif section_type == "list":
csv_rows.extend(self._render_json_list_to_csv(element))
elif section_type == "heading":
csv_rows.extend(self._render_json_heading_to_csv(element))
elif section_type == "paragraph":
csv_rows.extend(self._render_json_paragraph_to_csv(element))
elif section_type == "code":
csv_rows.extend(self._render_json_code_to_csv(element))
else:
# Fallback to paragraph for unknown types
csv_rows.extend(self._render_json_paragraph_to_csv(element))
return csv_rows
except Exception as e:
self.logger.warning(f"Error rendering section {section.get('id', 'unknown')}: {str(e)}")
return [["[Error rendering section]"]]
def _render_json_table_to_csv(self, table_data: Dict[str, Any]) -> List[List[str]]:
"""Render a JSON table to CSV rows."""
try:
headers = table_data.get("headers", [])
rows = table_data.get("rows", [])
csv_rows = []
if headers:
csv_rows.append(headers)
if rows:
csv_rows.extend(rows)
return csv_rows
except Exception as e:
self.logger.warning(f"Error rendering table: {str(e)}")
return [["[Error rendering table]"]]
def _render_json_list_to_csv(self, list_data: Dict[str, Any]) -> List[List[str]]:
"""Render a JSON list to CSV rows."""
try:
items = list_data.get("items", [])
csv_rows = []
for item in items:
if isinstance(item, dict):
text = item.get("text", "")
subitems = item.get("subitems", [])
csv_rows.append([text])
# Add subitems as indented rows
for subitem in subitems:
if isinstance(subitem, dict):
csv_rows.append([f" - {subitem.get('text', '')}"])
else:
csv_rows.append([f" - {subitem}"])
else:
csv_rows.append([str(item)])
return csv_rows
except Exception as e:
self.logger.warning(f"Error rendering list: {str(e)}")
return [["[Error rendering list]"]]
def _render_json_heading_to_csv(self, heading_data: Dict[str, Any]) -> List[List[str]]:
"""Render a JSON heading to CSV rows."""
try:
text = heading_data.get("text", "")
level = heading_data.get("level", 1)
if text:
# Use # symbols for heading levels
heading_text = f"{'#' * level} {text}"
return [[heading_text]]
return []
except Exception as e:
self.logger.warning(f"Error rendering heading: {str(e)}")
return [["[Error rendering heading]"]]
def _render_json_paragraph_to_csv(self, paragraph_data: Dict[str, Any]) -> List[List[str]]:
"""Render a JSON paragraph to CSV rows."""
try:
text = paragraph_data.get("text", "")
if text:
# Split long paragraphs into multiple rows if needed
if len(text) > 100:
words = text.split()
rows = []
current_row = []
current_length = 0
for word in words:
if current_length + len(word) > 100 and current_row:
rows.append([" ".join(current_row)])
current_row = [word]
current_length = len(word)
else:
current_row.append(word)
current_length += len(word) + 1
if current_row:
rows.append([" ".join(current_row)])
return rows
else:
return [[text]]
return []
except Exception as e:
self.logger.warning(f"Error rendering paragraph: {str(e)}")
return [["[Error rendering paragraph]"]]
def _render_json_code_to_csv(self, code_data: Dict[str, Any]) -> List[List[str]]:
"""Render a JSON code block to CSV rows."""
try:
code = code_data.get("code", "")
language = code_data.get("language", "")
csv_rows = []
if language:
csv_rows.append([f"Code ({language}):"])
if code:
# Split code into lines
code_lines = code.split('\n')
for line in code_lines:
csv_rows.append([f" {line}"])
return csv_rows
except Exception as e:
self.logger.warning(f"Error rendering code block: {str(e)}")
return [["[Error rendering code block]"]]
def _convert_rows_to_csv(self, rows: List[List[str]]) -> str:
"""Convert rows to CSV string."""
import csv
import io
output = io.StringIO()
writer = csv.writer(output)
for row in rows:
if row: # Only write non-empty rows
writer.writerow(row)
return output.getvalue()
def _clean_csv_content(self, content: str, title: str) -> str:
"""Clean and validate CSV content from AI."""
content = content.strip()

View file

@ -7,11 +7,12 @@ from typing import Dict, Any, Tuple, List
import io
import base64
import re
import os
from datetime import datetime, UTC
try:
from docx import Document
from docx.shared import Inches, Pt
from docx.shared import Inches, Pt, RGBColor
from docx.enum.text import WD_ALIGN_PARAGRAPH
from docx.enum.table import WD_TABLE_ALIGNMENT
from docx.oxml.shared import OxmlElement, qn
@ -43,18 +44,21 @@ class DocxRenderer(BaseRenderer):
"""Return only DOCX-specific guidelines; global prompt is built centrally."""
return (
"DOCX FORMAT GUIDELINES:\n"
"- Extract the ACTUAL table data, lists, and content from the source documents\n"
"- For tables: Extract all rows and columns in pipe-separated format (Column1 | Column2 | Column3)\n"
"- For lists: Extract the actual list items, not summaries\n"
"- Structure your response with clear headings using numbered format: 1) Heading, 2) Heading, etc.\n"
"- Use bullet points (-) for lists and sub-items\n"
"- Use **bold** for emphasis on key terms\n"
"- Use pipe-separated format (Item | Status) for tables when appropriate\n"
"- Provide clean, structured content that can be directly converted to Word formatting\n"
"- Do NOT include debug information, separators (---), metadata, or FILENAME headers\n"
"- Start directly with your content - no introductory text or separators\n"
"- Extract raw data, not analysis or summaries\n"
"OUTPUT: Return ONLY the structured plain text to be converted into DOCX."
)
async def render(self, extracted_content: str, title: str, user_prompt: str = None) -> Tuple[str, str]:
"""Render extracted content to DOCX format using user prompt as blueprint."""
async def render(self, extracted_content: Dict[str, Any], title: str, user_prompt: str = None, ai_service=None) -> Tuple[str, str]:
"""Render extracted JSON content to DOCX format using AI-analyzed styling."""
try:
if not DOCX_AVAILABLE:
# Fallback to HTML if python-docx not available
@ -63,8 +67,8 @@ class DocxRenderer(BaseRenderer):
html_content, _ = await html_renderer.render(extracted_content, title)
return html_content, "text/html"
# Generate DOCX using prompt-based structure
docx_content = self._generate_docx_from_prompt(extracted_content, title, user_prompt)
# Generate DOCX using AI-analyzed styling
docx_content = await self._generate_docx_from_json(extracted_content, title, user_prompt, ai_service)
return docx_content, "application/vnd.openxmlformats-officedocument.wordprocessingml.document"
@ -73,20 +77,37 @@ class DocxRenderer(BaseRenderer):
# Return minimal fallback
return f"DOCX Generation Error: {str(e)}", "text/plain"
def _generate_docx_from_prompt(self, content: str, title: str, user_prompt: str = None) -> str:
"""Generate DOCX content by parsing the AI-generated structured content."""
async def _generate_docx_from_json(self, json_content: Dict[str, Any], title: str, user_prompt: str = None, ai_service=None) -> str:
"""Generate DOCX content from structured JSON document using AI-generated styling."""
try:
# Create new document
doc = Document()
# Set up document styles
self._setup_document_styles(doc)
# Get AI-generated styling definitions
styles = await self._get_docx_styles(user_prompt, ai_service)
# Clean the content - remove debug information
clean_content = self._clean_ai_content(content)
# Apply basic document setup
self._setup_basic_document_styles(doc)
# Parse and convert the structured content to DOCX
self._parse_and_format_content(doc, clean_content, title)
# Validate JSON structure
if not isinstance(json_content, dict):
raise ValueError("JSON content must be a dictionary")
if "sections" not in json_content:
raise ValueError("JSON content must contain 'sections' field")
# Use title from JSON metadata if available, otherwise use provided title
document_title = json_content.get("metadata", {}).get("title", title)
# Add document title using analyzed styles
if document_title:
title_heading = doc.add_heading(document_title, level=1)
title_heading.alignment = WD_ALIGN_PARAGRAPH.CENTER
# Process each section in order
sections = json_content.get("sections", [])
for section in sections:
self._render_json_section(doc, section, styles)
# Save to buffer
buffer = io.BytesIO()
@ -100,9 +121,405 @@ class DocxRenderer(BaseRenderer):
return docx_base64
except Exception as e:
self.logger.error(f"Error generating DOCX from prompt: {str(e)}")
self.logger.error(f"Error generating DOCX from JSON: {str(e)}")
raise Exception(f"DOCX generation failed: {str(e)}")
async def _get_docx_styles(self, user_prompt: str, ai_service=None) -> Dict[str, Any]:
"""Simple AI call to get DOCX styling definitions."""
if not ai_service:
return self._get_default_styles()
try:
prompt = f"""
For this DOCX document request: "{user_prompt}"
Provide styling definitions for DOCX elements. IMPORTANT: Ensure proper contrast - never use white text on white background or dark text on dark background. Respond with ONLY JSON:
{{
"title": {{"font_size": 24, "color": "#1F4E79", "bold": true, "align": "center"}},
"heading1": {{"font_size": 18, "color": "#2F2F2F", "bold": true, "align": "left"}},
"heading2": {{"font_size": 14, "color": "#4F4F4F", "bold": true, "align": "left"}},
"paragraph": {{"font_size": 11, "color": "#2F2F2F", "bold": false, "align": "left"}},
"table_header": {{"background": "#4F4F4F", "text_color": "#FFFFFF", "bold": true, "align": "center"}},
"table_cell": {{"background": "#FFFFFF", "text_color": "#2F2F2F", "bold": false, "align": "left"}},
"table_border": {{"style": "horizontal_only", "color": "#000000", "thickness": "thin"}},
"bullet_list": {{"font_size": 11, "color": "#2F2F2F", "indent": 20}},
"code_block": {{"font": "Courier New", "font_size": 10, "color": "#2F2F2F", "background": "#F5F5F5"}}
}}
CRITICAL: Table headers must have dark background with light text, table cells must have light background with dark text for readability.
"""
from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, OperationType
request_options = AiCallOptions()
request_options.operationType = OperationType.GENERAL
request = AiCallRequest(prompt=prompt, context="", options=request_options)
response = await ai_service.aiObjects.call(request)
import json
import re
# Clean and parse JSON
result = response.content.strip()
if result.startswith('```json'):
result = re.sub(r'^```json\s*', '', result)
result = re.sub(r'\s*```$', '', result)
elif result.startswith('```'):
result = re.sub(r'^```\s*', '', result)
result = re.sub(r'\s*```$', '', result)
styles = json.loads(result)
# Validate and fix contrast issues
styles = self._validate_styles_contrast(styles)
return styles
except Exception as e:
self.logger.warning(f"AI styling failed: {str(e)}, using defaults")
return self._get_default_styles()
def _validate_styles_contrast(self, styles: Dict[str, Any]) -> Dict[str, Any]:
"""Validate and fix contrast issues in AI-generated styles."""
try:
# Fix table header contrast
if "table_header" in styles:
header = styles["table_header"]
bg_color = header.get("background", "#FFFFFF")
text_color = header.get("text_color", "#000000")
# If both are white or both are dark, fix it
if bg_color.upper() == "#FFFFFF" and text_color.upper() == "#FFFFFF":
header["background"] = "#4F4F4F"
header["text_color"] = "#FFFFFF"
elif bg_color.upper() == "#000000" and text_color.upper() == "#000000":
header["background"] = "#4F4F4F"
header["text_color"] = "#FFFFFF"
# Fix table cell contrast
if "table_cell" in styles:
cell = styles["table_cell"]
bg_color = cell.get("background", "#FFFFFF")
text_color = cell.get("text_color", "#000000")
# If both are white or both are dark, fix it
if bg_color.upper() == "#FFFFFF" and text_color.upper() == "#FFFFFF":
cell["background"] = "#FFFFFF"
cell["text_color"] = "#2F2F2F"
elif bg_color.upper() == "#000000" and text_color.upper() == "#000000":
cell["background"] = "#FFFFFF"
cell["text_color"] = "#2F2F2F"
return styles
except Exception as e:
self.logger.warning(f"Style validation failed: {str(e)}")
return self._get_default_styles()
def _get_default_styles(self) -> Dict[str, Any]:
"""Default DOCX styles."""
return {
"title": {"font_size": 24, "color": "#1F4E79", "bold": True, "align": "center"},
"heading1": {"font_size": 18, "color": "#2F2F2F", "bold": True, "align": "left"},
"heading2": {"font_size": 14, "color": "#4F4F4F", "bold": True, "align": "left"},
"paragraph": {"font_size": 11, "color": "#2F2F2F", "bold": False, "align": "left"},
"table_header": {"background": "#4F4F4F", "text_color": "#FFFFFF", "bold": True, "align": "center"},
"table_cell": {"background": "#FFFFFF", "text_color": "#2F2F2F", "bold": False, "align": "left"},
"table_border": {"style": "horizontal_only", "color": "#000000", "thickness": "thin"},
"bullet_list": {"font_size": 11, "color": "#2F2F2F", "indent": 20},
"code_block": {"font": "Courier New", "font_size": 10, "color": "#2F2F2F", "background": "#F5F5F5"}
}
def _setup_basic_document_styles(self, doc: Document) -> None:
"""Set up basic document styles."""
try:
# Set default font
style = doc.styles['Normal']
font = style.font
font.name = 'Calibri'
font.size = Pt(11)
except Exception as e:
self.logger.warning(f"Could not set up basic document styles: {str(e)}")
def _clear_template_content(self, doc: Document) -> None:
"""Clear template content while preserving styles."""
try:
# Remove all paragraphs except keep the styles
for paragraph in list(doc.paragraphs):
# Keep the paragraph but clear its content
paragraph.clear()
# Remove all tables
for table in list(doc.tables):
table._element.getparent().remove(table._element)
except Exception as e:
self.logger.warning(f"Could not clear template content: {str(e)}")
def _render_json_section(self, doc: Document, section: Dict[str, Any], styles: Dict[str, Any]) -> None:
"""Render a single JSON section to DOCX using AI-generated styles."""
try:
section_type = section.get("type", "paragraph")
section_data = section.get("data", {})
if section_type == "table":
self._render_json_table(doc, section_data, styles)
elif section_type == "bullet_list":
self._render_json_bullet_list(doc, section_data, styles)
elif section_type == "heading":
self._render_json_heading(doc, section_data, styles)
elif section_type == "paragraph":
self._render_json_paragraph(doc, section_data, styles)
elif section_type == "code_block":
self._render_json_code_block(doc, section_data, styles)
elif section_type == "image":
self._render_json_image(doc, section_data, styles)
else:
# Fallback to paragraph for unknown types
self._render_json_paragraph(doc, section_data, styles)
except Exception as e:
self.logger.warning(f"Error rendering section {section.get('id', 'unknown')}: {str(e)}")
# Add error paragraph as fallback
error_para = doc.add_paragraph(f"[Error rendering section: {str(e)}]")
def _render_json_table(self, doc: Document, table_data: Dict[str, Any], styles: Dict[str, Any]) -> None:
"""Render a JSON table to DOCX using AI-generated styles."""
try:
headers = table_data.get("headers", [])
rows = table_data.get("rows", [])
if not headers or not rows:
return
# Create table
table = doc.add_table(rows=len(rows) + 1, cols=len(headers))
table.alignment = WD_TABLE_ALIGNMENT.CENTER
# Apply table borders based on AI style
border_style = styles["table_border"]["style"]
if border_style == "horizontal_only":
self._apply_horizontal_borders_only(table)
elif border_style == "grid":
table.style = 'Table Grid'
# else: no borders
# Add headers with AI-generated styling
header_row = table.rows[0]
header_style = styles["table_header"]
for i, header in enumerate(headers):
if i < len(header_row.cells):
cell = header_row.cells[i]
cell.text = str(header)
# Apply background color
bg_color = header_style["background"].lstrip('#')
self._set_cell_background(cell, RGBColor(int(bg_color[0:2], 16), int(bg_color[2:4], 16), int(bg_color[4:6], 16)))
# Apply text styling
for paragraph in cell.paragraphs:
paragraph.alignment = WD_ALIGN_PARAGRAPH.CENTER if header_style["align"] == "center" else WD_ALIGN_PARAGRAPH.LEFT
for run in paragraph.runs:
run.bold = header_style["bold"]
run.font.size = Pt(11)
text_color = header_style["text_color"].lstrip('#')
run.font.color.rgb = RGBColor(int(text_color[0:2], 16), int(text_color[2:4], 16), int(text_color[4:6], 16))
# Add data rows with AI-generated styling
cell_style = styles["table_cell"]
for row_idx, row_data in enumerate(rows):
if row_idx + 1 < len(table.rows):
table_row = table.rows[row_idx + 1]
for col_idx, cell_data in enumerate(row_data):
if col_idx < len(table_row.cells):
cell = table_row.cells[col_idx]
cell.text = str(cell_data)
# Apply text styling
for paragraph in cell.paragraphs:
paragraph.alignment = WD_ALIGN_PARAGRAPH.LEFT
for run in paragraph.runs:
run.font.size = Pt(10)
text_color = cell_style["text_color"].lstrip('#')
run.font.color.rgb = RGBColor(int(text_color[0:2], 16), int(text_color[2:4], 16), int(text_color[4:6], 16))
except Exception as e:
self.logger.warning(f"Error rendering table: {str(e)}")
def _apply_horizontal_borders_only(self, table) -> None:
"""Apply only horizontal borders to the table (no vertical borders)."""
try:
from docx.oxml.shared import OxmlElement, qn
# Get table properties
tbl_pr = table._element.find(qn('w:tblPr'))
if tbl_pr is None:
tbl_pr = OxmlElement('w:tblPr')
table._element.insert(0, tbl_pr)
# Remove existing borders
existing_borders = tbl_pr.find(qn('w:tblBorders'))
if existing_borders is not None:
tbl_pr.remove(existing_borders)
# Create new borders element
tbl_borders = OxmlElement('w:tblBorders')
# Top border
top_border = OxmlElement('w:top')
top_border.set(qn('w:val'), 'single')
top_border.set(qn('w:sz'), '4')
top_border.set(qn('w:space'), '0')
top_border.set(qn('w:color'), '000000')
tbl_borders.append(top_border)
# Bottom border
bottom_border = OxmlElement('w:bottom')
bottom_border.set(qn('w:val'), 'single')
bottom_border.set(qn('w:sz'), '4')
bottom_border.set(qn('w:space'), '0')
bottom_border.set(qn('w:color'), '000000')
tbl_borders.append(bottom_border)
# Left border - none
left_border = OxmlElement('w:left')
left_border.set(qn('w:val'), 'none')
tbl_borders.append(left_border)
# Right border - none
right_border = OxmlElement('w:right')
right_border.set(qn('w:val'), 'none')
tbl_borders.append(right_border)
# Inside horizontal border
inside_h_border = OxmlElement('w:insideH')
inside_h_border.set(qn('w:val'), 'single')
inside_h_border.set(qn('w:sz'), '4')
inside_h_border.set(qn('w:space'), '0')
inside_h_border.set(qn('w:color'), '000000')
tbl_borders.append(inside_h_border)
# Inside vertical border - none
inside_v_border = OxmlElement('w:insideV')
inside_v_border.set(qn('w:val'), 'none')
tbl_borders.append(inside_v_border)
tbl_pr.append(tbl_borders)
except Exception as e:
self.logger.warning(f"Could not apply horizontal borders: {str(e)}")
def _set_cell_background(self, cell, color: RGBColor) -> None:
"""Set the background color of a table cell."""
try:
from docx.oxml.shared import OxmlElement, qn
# Get cell properties
tc_pr = cell._element.find(qn('w:tcPr'))
if tc_pr is None:
tc_pr = OxmlElement('w:tcPr')
cell._element.insert(0, tc_pr)
# Remove existing shading
existing_shading = tc_pr.find(qn('w:shd'))
if existing_shading is not None:
tc_pr.remove(existing_shading)
# Create new shading element
shading = OxmlElement('w:shd')
shading.set(qn('w:val'), 'clear')
shading.set(qn('w:color'), 'auto')
# Convert RGBColor to hex string by unpacking RGB components
red, green, blue = color
hex_color = f"{red:02x}{green:02x}{blue:02x}"
shading.set(qn('w:fill'), hex_color)
tc_pr.append(shading)
except Exception as e:
self.logger.warning(f"Could not set cell background: {str(e)}")
def _render_json_bullet_list(self, doc: Document, list_data: Dict[str, Any], styles: Dict[str, Any]) -> None:
"""Render a JSON bullet list to DOCX using AI-generated styles."""
try:
items = list_data.get("items", [])
bullet_style = styles["bullet_list"]
for item in items:
if isinstance(item, str):
para = doc.add_paragraph(item, style='List Bullet')
elif isinstance(item, dict) and "text" in item:
para = doc.add_paragraph(item["text"], style='List Bullet')
except Exception as e:
self.logger.warning(f"Error rendering bullet list: {str(e)}")
def _render_json_heading(self, doc: Document, heading_data: Dict[str, Any], styles: Dict[str, Any]) -> None:
"""Render a JSON heading to DOCX using AI-generated styles."""
try:
level = heading_data.get("level", 1)
text = heading_data.get("text", "")
if text:
level = max(1, min(6, level))
doc.add_heading(text, level=level)
except Exception as e:
self.logger.warning(f"Error rendering heading: {str(e)}")
def _render_json_paragraph(self, doc: Document, paragraph_data: Dict[str, Any], styles: Dict[str, Any]) -> None:
"""Render a JSON paragraph to DOCX using AI-generated styles."""
try:
text = paragraph_data.get("text", "")
if text:
para = doc.add_paragraph(text)
except Exception as e:
self.logger.warning(f"Error rendering paragraph: {str(e)}")
def _render_json_code_block(self, doc: Document, code_data: Dict[str, Any], styles: Dict[str, Any]) -> None:
"""Render a JSON code block to DOCX using AI-generated styles."""
try:
code = code_data.get("code", "")
language = code_data.get("language", "")
if code:
if language:
lang_para = doc.add_paragraph(f"Code ({language}):")
lang_para.runs[0].bold = True
code_para = doc.add_paragraph(code)
for run in code_para.runs:
run.font.name = 'Courier New'
run.font.size = Pt(10)
except Exception as e:
self.logger.warning(f"Error rendering code block: {str(e)}")
def _render_json_image(self, doc: Document, image_data: Dict[str, Any], styles: Dict[str, Any]) -> None:
"""Render a JSON image to DOCX."""
try:
base64_data = image_data.get("base64Data", "")
alt_text = image_data.get("altText", "Image")
if base64_data:
image_bytes = base64.b64decode(base64_data)
doc.add_picture(io.BytesIO(image_bytes), width=Inches(4))
if alt_text:
caption_para = doc.add_paragraph(f"Figure: {alt_text}")
caption_para.runs[0].italic = True
except Exception as e:
self.logger.warning(f"Error rendering image: {str(e)}")
doc.add_paragraph(f"[Image: {image_data.get('altText', 'Image')}]")
def _extract_structure_from_prompt(self, user_prompt: str, title: str) -> Dict[str, Any]:
"""Extract document structure from user prompt."""
structure = {
@ -419,118 +836,217 @@ class DocxRenderer(BaseRenderer):
return '\n\n'.join(unique_sections)
def _process_tables(self, doc, content: str) -> str:
"""
Process tables in the content (both CSV and pipe-separated) and convert them to Word tables.
Returns the content with tables replaced by placeholders.
"""
import csv
import io
lines = content.split('\n')
processed_lines = []
i = 0
while i < len(lines):
line = lines[i].strip()
# Check if this line looks like a table (contains pipes or commas with multiple fields)
is_pipe_table = '|' in line and len(line.split('|')) >= 2
is_csv_table = ',' in line and len(line.split(',')) >= 2
if is_pipe_table or is_csv_table:
# Collect consecutive table lines
table_lines = []
j = i
# Determine separator and collect lines
separator = '|' if is_pipe_table else ','
while j < len(lines):
current_line = lines[j].strip()
if separator in current_line and len(current_line.split(separator)) >= 2:
table_lines.append(current_line)
j += 1
else:
break
if len(table_lines) >= 2: # At least header + 1 data row
# Create Word table
try:
if separator == '|':
# Process pipe-separated table
rows = []
for table_line in table_lines:
# Split by pipe and clean up
cells = [cell.strip() for cell in table_line.split('|')]
rows.append(cells)
else:
# Process CSV table
csv_content = '\n'.join(table_lines)
csv_reader = csv.reader(io.StringIO(csv_content))
rows = list(csv_reader)
if rows and len(rows[0]) > 0:
# Create Word table
table = doc.add_table(rows=len(rows), cols=len(rows[0]))
table.style = 'Table Grid'
# Populate table
for row_idx, row_data in enumerate(rows):
for col_idx, cell_data in enumerate(row_data):
if col_idx < len(table.rows[row_idx].cells):
table.rows[row_idx].cells[col_idx].text = cell_data.strip()
# Make header row bold
if row_idx == 0:
for cell in table.rows[row_idx].cells:
for paragraph in cell.paragraphs:
for run in paragraph.runs:
run.bold = True
# Add placeholder to mark where table was inserted
processed_lines.append(f"[TABLE_INSERTED_{len(processed_lines)}]")
# Skip the table lines
i = j
continue
except Exception as e:
# If table parsing fails, treat as regular text
pass
processed_lines.append(line)
i += 1
return '\n'.join(processed_lines)
def _parse_and_format_content(self, doc, content: str, title: str):
"""Parse AI-generated structured content and format it as DOCX."""
"""Parse AI-generated content in standardized format and apply proper DOCX formatting."""
if not content:
return
# Add title
title_para = doc.add_heading(title, 0)
title_para.alignment = WD_ALIGN_PARAGRAPH.CENTER
# Process tables and replace them with placeholders
content = self._process_tables(doc, content)
# Add generation date
date_para = doc.add_paragraph(f"Generated: {self._format_timestamp()}")
date_para.alignment = WD_ALIGN_PARAGRAPH.CENTER
# Add page break
doc.add_page_break()
# Parse content line by line
# Parse content line by line in exact sequence
lines = content.split('\n')
current_paragraph = []
for line in lines:
line = line.strip()
if not line:
# Empty line - end current paragraph
if current_paragraph:
self._add_paragraph_to_doc(doc, '\n'.join(current_paragraph))
current_paragraph = []
# Empty line - add paragraph break
doc.add_paragraph()
continue
# Check if this is a numbered heading (1) Title, 2) Title, etc.)
if re.match(r'^\d+\)\s+.+', line):
# Flush current paragraph
if current_paragraph:
self._add_paragraph_to_doc(doc, '\n'.join(current_paragraph))
current_paragraph = []
# Skip table placeholders (already processed)
if line.startswith('[TABLE_INSERTED_'):
continue
# Check if this is a Markdown heading (# ## ###)
if line.startswith('#'):
level = len(line) - len(line.lstrip('#'))
heading_text = line.lstrip('# ').strip()
doc.add_heading(heading_text, level=min(level, 3))
# Add as heading
# Check if this is a numbered heading (1) Title, 2) Title, etc.)
elif re.match(r'^\d+\)\s+.+', line):
heading_text = re.sub(r'^\d+\)\s+', '', line)
doc.add_heading(heading_text, level=1)
# Check if this is a bullet point (- item)
elif line.startswith('- '):
# Flush current paragraph
if current_paragraph:
self._add_paragraph_to_doc(doc, '\n'.join(current_paragraph))
current_paragraph = []
# Add as bullet point
bullet_text = line[2:] # Remove "- "
# Check if this is a Markdown list item
elif line.startswith('- ') or re.match(r'^\d+\.\s+', line):
bullet_text = re.sub(r'^[-•]\s+|\d+\.\s+', '', line)
self._add_bullet_point(doc, bullet_text)
# Check if this is a table row (contains pipe separator)
elif '|' in line:
# Flush current paragraph
if current_paragraph:
self._add_paragraph_to_doc(doc, '\n'.join(current_paragraph))
current_paragraph = []
# This is a table row - collect table data
self._process_table_row(doc, line)
# Check if this is a code block
elif line.startswith('```'):
if not line.endswith('```'):
# Start of code block - collect until end
code_lines = [line]
continue
else:
# End of code block
if 'code_lines' in locals():
code_lines.append(line)
code_text = '\n'.join(code_lines)
para = doc.add_paragraph()
run = para.add_run(code_text)
run.font.name = 'Courier New'
del code_lines
# Regular paragraph
else:
# Regular text - finalize any open table first
if hasattr(self, '_current_table') and self._current_table is not None:
self._finalize_current_table(doc)
# Add to current paragraph
current_paragraph.append(line)
# Flush any remaining paragraph
if current_paragraph:
self._add_paragraph_to_doc(doc, '\n'.join(current_paragraph))
# Finalize any open table
self._finalize_current_table(doc)
def _finalize_current_table(self, doc):
"""Finalize the current table if one exists."""
if hasattr(self, '_current_table') and self._current_table is not None:
# Apply final styling to the table
self._style_table(self._current_table)
# Clear the current table reference
self._current_table = None
self._add_paragraph_to_doc(doc, line)
def _add_paragraph_to_doc(self, doc, text: str):
"""Add a paragraph to the document with proper formatting."""
if not text.strip():
return
# Check for bold text (**text**)
if '**' in text:
para = doc.add_paragraph()
parts = text.split('**')
for i, part in enumerate(parts):
if i % 2 == 0:
# Regular text
if part:
para.add_run(part)
else:
# Bold text
if part:
run = para.add_run(part)
run.bold = True
# Check for Markdown formatting (**bold**, *italic*)
para = doc.add_paragraph()
# Split by bold markers
parts = text.split('**')
for i, part in enumerate(parts):
if i % 2 == 0:
# Regular text - check for italic
italic_parts = part.split('*')
for j, italic_part in enumerate(italic_parts):
if j % 2 == 0:
# Regular text
if italic_part:
para.add_run(italic_part)
else:
# Italic text
if italic_part:
run = para.add_run(italic_part)
run.italic = True
else:
# Bold text
if part:
run = para.add_run(part)
run.bold = True
def _process_table_row(self, doc, line: str):
def _add_bullet_point(self, doc, text: str):
"""Add a bullet point to the document."""
if not text.strip():
return
# Create paragraph with bullet style
para = doc.add_paragraph(text, style='List Bullet')
# Check for Markdown formatting in bullet point
if '**' in text or '*' in text:
# Clear the paragraph and rebuild with formatting
para.clear()
self._add_paragraph_to_doc(doc, text)
def _style_table(self, table):
"""Apply styling to the table."""
try:
# Style header row
if len(table.rows) > 0:
header_cells = table.rows[0].cells
for cell in header_cells:
for paragraph in cell.paragraphs:
for run in paragraph.runs:
run.bold = True
except Exception as e:
self.logger.warning(f"Could not style table: {str(e)}")
def _format_timestamp(self) -> str:
"""Format current timestamp for document generation."""
from datetime import datetime, UTC
return datetime.now(UTC).strftime("%Y-%m-%d %H:%M:%S UTC")
"""Process a table row and add it to the document."""
if not line.strip():
return
# Clean the line - remove bullet point markers and bold markers
clean_line = line.strip()
if clean_line.startswith('- **'):
if clean_line.startswith(''):
clean_line = clean_line[1:] # Remove "•"
elif clean_line.startswith('- **'):
clean_line = clean_line[4:] # Remove "- **"
elif clean_line.startswith('- '):
clean_line = clean_line[2:] # Remove "- "

View file

@ -39,25 +39,28 @@ class ExcelRenderer(BaseRenderer):
"""Return only Excel-specific guidelines; global prompt is built centrally."""
return (
"EXCEL FORMAT GUIDELINES:\n"
"- Output one or more pipe-delimited tables with a single header row.\n"
"- Let user intent define columns; use clear names and ISO dates.\n"
"- Separate multiple tables by a single blank line.\n"
"- No markdown/HTML/code fences; tables only unless user explicitly asks for notes.\n"
"OUTPUT: Return ONLY pipe-delimited tables suitable for import."
"- Extract structured data from source documents into JSON format\n"
"- Focus on tabular data, lists, and structured information suitable for spreadsheets\n"
"- For tables: Extract headers and rows as separate arrays with clear column names\n"
"- For lists: Extract items with optional sub-items and metadata\n"
"- Structure content into sections with clear content types (table, list, paragraph)\n"
"- Use proper JSON structure with metadata, sections, and elements\n"
"- Ensure data is clean and ready for Excel conversion with proper formatting\n"
"OUTPUT: Return structured JSON that can be converted to Excel format."
)
async def render(self, extracted_content: str, title: str) -> Tuple[str, str]:
"""Render extracted content to Excel format."""
async def render(self, extracted_content: Dict[str, Any], title: str, user_prompt: str = None, ai_service=None) -> Tuple[str, str]:
"""Render extracted JSON content to Excel format using AI-analyzed styling."""
try:
if not OPENPYXL_AVAILABLE:
# Fallback to CSV if openpyxl not available
from .csv_renderer import CsvRenderer
csv_renderer = CsvRenderer()
csv_content, _ = await csv_renderer.render(extracted_content, title)
csv_content, _ = await csv_renderer.render(extracted_content, title, user_prompt, ai_service)
return csv_content, "text/csv"
# Generate Excel using openpyxl
excel_content = self._generate_excel(extracted_content, title)
# Generate Excel using AI-analyzed styling
excel_content = await self._generate_excel_from_json(extracted_content, title, user_prompt, ai_service)
return excel_content, "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
@ -208,3 +211,355 @@ class ExcelRenderer(BaseRenderer):
except Exception as e:
self.logger.warning(f"Could not populate analysis sheet: {str(e)}")
async def _generate_excel_from_json(self, json_content: Dict[str, Any], title: str, user_prompt: str = None, ai_service=None) -> str:
"""Generate Excel content from structured JSON document using AI-generated styling."""
try:
# Get AI-generated styling definitions
styles = await self._get_excel_styles(user_prompt, ai_service)
# Validate JSON structure
if not isinstance(json_content, dict):
raise ValueError("JSON content must be a dictionary")
if "sections" not in json_content:
raise ValueError("JSON content must contain 'sections' field")
# Use title from JSON metadata if available, otherwise use provided title
document_title = json_content.get("metadata", {}).get("title", title)
# Create workbook
wb = Workbook()
# Remove default sheet
wb.remove(wb.active)
# Create sheets based on content
sheets = self._create_excel_sheets(wb, json_content, styles)
# Populate sheets with content
self._populate_excel_sheets(sheets, json_content, styles)
# Save to buffer
buffer = io.BytesIO()
wb.save(buffer)
buffer.seek(0)
# Convert to base64
excel_bytes = buffer.getvalue()
excel_base64 = base64.b64encode(excel_bytes).decode('utf-8')
return excel_base64
except Exception as e:
self.logger.error(f"Error generating Excel from JSON: {str(e)}")
raise Exception(f"Excel generation failed: {str(e)}")
async def _get_excel_styles(self, user_prompt: str, ai_service=None) -> Dict[str, Any]:
"""Simple AI call to get Excel styling definitions."""
if not ai_service:
return self._get_default_excel_styles()
try:
prompt = f"""
For this Excel document request: "{user_prompt}"
Provide styling definitions for Excel elements. Respond with ONLY JSON:
{{
"title": {{"font_size": 16, "color": "#1F4E79", "bold": true, "align": "center"}},
"heading": {{"font_size": 14, "color": "#2F2F2F", "bold": true, "align": "left"}},
"table_header": {{"background": "#4F4F4F", "text_color": "#FFFFFF", "bold": true, "align": "center"}},
"table_cell": {{"background": "#FFFFFF", "text_color": "#2F2F2F", "bold": false, "align": "left"}},
"bullet_list": {{"font_size": 11, "color": "#2F2F2F", "indent": 2}},
"paragraph": {{"font_size": 11, "color": "#2F2F2F", "bold": false, "align": "left"}},
"code_block": {{"font": "Courier New", "font_size": 10, "color": "#2F2F2F", "background": "#F5F5F5"}}
}}
CRITICAL: Table headers must have dark background with light text, table cells must have light background with dark text for readability.
"""
from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, OperationType
request_options = AiCallOptions()
request_options.operationType = OperationType.GENERAL
request = AiCallRequest(prompt=prompt, context="", options=request_options)
response = await ai_service.aiObjects.call(request)
import json
import re
# Clean and parse JSON
result = response.content.strip()
if result.startswith('```json'):
result = re.sub(r'^```json\s*', '', result)
result = re.sub(r'\s*```$', '', result)
elif result.startswith('```'):
result = re.sub(r'^```\s*', '', result)
result = re.sub(r'\s*```$', '', result)
styles = json.loads(result)
# Validate and fix contrast issues
styles = self._validate_excel_styles_contrast(styles)
return styles
except Exception as e:
self.logger.warning(f"AI styling failed: {str(e)}, using defaults")
return self._get_default_excel_styles()
def _validate_excel_styles_contrast(self, styles: Dict[str, Any]) -> Dict[str, Any]:
"""Validate and fix contrast issues in AI-generated styles."""
try:
# Fix table header contrast
if "table_header" in styles:
header = styles["table_header"]
bg_color = header.get("background", "#FFFFFF")
text_color = header.get("text_color", "#000000")
# If both are white or both are dark, fix it
if bg_color.upper() == "#FFFFFF" and text_color.upper() == "#FFFFFF":
header["background"] = "#4F4F4F"
header["text_color"] = "#FFFFFF"
elif bg_color.upper() == "#000000" and text_color.upper() == "#000000":
header["background"] = "#4F4F4F"
header["text_color"] = "#FFFFFF"
# Fix table cell contrast
if "table_cell" in styles:
cell = styles["table_cell"]
bg_color = cell.get("background", "#FFFFFF")
text_color = cell.get("text_color", "#000000")
# If both are white or both are dark, fix it
if bg_color.upper() == "#FFFFFF" and text_color.upper() == "#FFFFFF":
cell["background"] = "#FFFFFF"
cell["text_color"] = "#2F2F2F"
elif bg_color.upper() == "#000000" and text_color.upper() == "#000000":
cell["background"] = "#FFFFFF"
cell["text_color"] = "#2F2F2F"
return styles
except Exception as e:
self.logger.warning(f"Style validation failed: {str(e)}")
return self._get_default_excel_styles()
def _get_default_excel_styles(self) -> Dict[str, Any]:
"""Default Excel styles."""
return {
"title": {"font_size": 16, "color": "#1F4E79", "bold": True, "align": "center"},
"heading": {"font_size": 14, "color": "#2F2F2F", "bold": True, "align": "left"},
"table_header": {"background": "#4F4F4F", "text_color": "#FFFFFF", "bold": True, "align": "center"},
"table_cell": {"background": "#FFFFFF", "text_color": "#2F2F2F", "bold": False, "align": "left"},
"bullet_list": {"font_size": 11, "color": "#2F2F2F", "indent": 2},
"paragraph": {"font_size": 11, "color": "#2F2F2F", "bold": False, "align": "left"},
"code_block": {"font": "Courier New", "font_size": 10, "color": "#2F2F2F", "background": "#F5F5F5"}
}
def _create_excel_sheets(self, wb: Workbook, json_content: Dict[str, Any], styles: Dict[str, Any]) -> Dict[str, Any]:
"""Create Excel sheets based on content structure and user intent."""
sheets = {}
# Get sheet names from AI styles or generate based on content
sheet_names = styles.get("sheet_names", self._generate_sheet_names_from_content(json_content))
# Create sheets
for i, sheet_name in enumerate(sheet_names):
if i == 0:
sheet = wb.active
sheet.title = sheet_name
else:
sheet = wb.create_sheet(sheet_name, i)
sheets[sheet_name.lower()] = sheet
return sheets
def _generate_sheet_names_from_content(self, json_content: Dict[str, Any]) -> List[str]:
"""Generate sheet names based on actual content structure."""
sections = json_content.get("sections", [])
# If no sections, create a single sheet
if not sections:
return ["Content"]
# Generate sheet names based on content types
sheet_names = []
# Always start with a main content sheet
document_title = json_content.get("metadata", {}).get("title", "Document")
sheet_names.append(document_title[:31]) # Excel sheet name limit
# Add sheets based on content types found
content_types = set()
for section in sections:
content_type = section.get("content_type", "paragraph")
content_types.add(content_type)
# Create sheets for different content types if we have multiple types
if len(content_types) > 1:
if "table" in content_types:
sheet_names.append("Tables")
if "list" in content_types:
sheet_names.append("Lists")
if "paragraph" in content_types or "heading" in content_types:
sheet_names.append("Text")
# Limit to 4 sheets maximum
return sheet_names[:4]
def _populate_excel_sheets(self, sheets: Dict[str, Any], json_content: Dict[str, Any], styles: Dict[str, Any]) -> None:
"""Populate Excel sheets with content from JSON based on actual sheet names."""
try:
# Get the actual sheet names that were created
sheet_names = list(sheets.keys())
if not sheet_names:
return
# Populate the first sheet with all content
first_sheet_name = sheet_names[0]
self._populate_main_sheet(sheets[first_sheet_name], json_content, styles)
# If we have multiple sheets, distribute content by type
if len(sheet_names) > 1:
self._populate_content_type_sheets(sheets, json_content, styles, sheet_names[1:])
except Exception as e:
self.logger.warning(f"Could not populate Excel sheets: {str(e)}")
def _populate_main_sheet(self, sheet, json_content: Dict[str, Any], styles: Dict[str, Any]):
"""Populate the main sheet with document overview and all content."""
try:
# Document title
document_title = json_content.get("metadata", {}).get("title", "Generated Report")
sheet['A1'] = document_title
title_style = styles["title"]
sheet['A1'].font = Font(size=title_style["font_size"], bold=title_style["bold"], color=title_style["color"])
sheet['A1'].alignment = Alignment(horizontal=title_style["align"])
# Generation info
sheet['A3'] = "Generated:"
sheet['B3'] = self._format_timestamp()
sheet['A4'] = "Status:"
sheet['B4'] = "Generated Successfully"
# Document metadata
metadata = json_content.get("metadata", {})
if metadata:
sheet['A6'] = "Document Information:"
sheet['A6'].font = Font(bold=True)
row = 7
for key, value in metadata.items():
if key != "title":
sheet[f'A{row}'] = f"{key.title()}:"
sheet[f'B{row}'] = str(value)
row += 1
# Content overview
sections = json_content.get("sections", [])
sheet[f'A{row + 1}'] = "Content Overview:"
sheet[f'A{row + 1}'].font = Font(bold=True)
row += 2
sheet[f'A{row}'] = f"Total Sections: {len(sections)}"
# Count different content types
content_types = {}
for section in sections:
content_type = section.get("content_type", "unknown")
content_types[content_type] = content_types.get(content_type, 0) + 1
for content_type, count in content_types.items():
row += 1
sheet[f'A{row}'] = f"{content_type.title()} Sections: {count}"
# Add all content to this sheet
row += 2
for section in sections:
row = self._add_section_to_sheet(sheet, section, styles, row)
row += 1 # Empty row between sections
# Auto-adjust column widths
sheet.column_dimensions['A'].width = 20
sheet.column_dimensions['B'].width = 30
except Exception as e:
self.logger.warning(f"Could not populate main sheet: {str(e)}")
def _populate_content_type_sheets(self, sheets: Dict[str, Any], json_content: Dict[str, Any], styles: Dict[str, Any], sheet_names: List[str]):
"""Populate additional sheets based on content types."""
try:
sections = json_content.get("sections", [])
for sheet_name in sheet_names:
if sheet_name not in sheets:
continue
sheet = sheets[sheet_name]
sheet_title = sheet_name.title()
sheet['A1'] = sheet_title
sheet['A1'].font = Font(size=16, bold=True)
row = 3
# Filter sections by content type
if sheet_name == "tables":
filtered_sections = [s for s in sections if s.get("content_type") == "table"]
elif sheet_name == "lists":
filtered_sections = [s for s in sections if s.get("content_type") == "list"]
elif sheet_name == "text":
filtered_sections = [s for s in sections if s.get("content_type") in ["paragraph", "heading"]]
else:
filtered_sections = sections
for section in filtered_sections:
row = self._add_section_to_sheet(sheet, section, styles, row)
row += 1 # Empty row between sections
# Auto-adjust column widths
for col in range(1, 6):
sheet.column_dimensions[get_column_letter(col)].width = 20
except Exception as e:
self.logger.warning(f"Could not populate content type sheets: {str(e)}")
def _add_section_to_sheet(self, sheet, section: Dict[str, Any], styles: Dict[str, Any], start_row: int) -> int:
"""Add a section to a sheet and return the next row."""
try:
# Add section title
section_title = section.get("title")
if section_title:
sheet[f'A{start_row}'] = f"# {section_title}"
sheet[f'A{start_row}'].font = Font(bold=True)
start_row += 1
# Process section elements
elements = section.get("elements", [])
content_type = section.get("content_type", "paragraph")
for element in elements:
if content_type == "table":
start_row = self._add_table_to_excel(sheet, element, styles, start_row)
elif content_type == "list":
start_row = self._add_list_to_excel(sheet, element, styles, start_row)
elif content_type == "paragraph":
start_row = self._add_paragraph_to_excel(sheet, element, styles, start_row)
elif content_type == "heading":
start_row = self._add_heading_to_excel(sheet, element, styles, start_row)
else:
start_row = self._add_paragraph_to_excel(sheet, element, styles, start_row)
return start_row
except Exception as e:
self.logger.warning(f"Could not add section to sheet: {str(e)}")
return start_row + 1
def _format_timestamp(self) -> str:
"""Format current timestamp for document generation."""
return datetime.now(UTC).strftime("%Y-%m-%d %H:%M:%S UTC")

View file

@ -20,13 +20,15 @@ class PptxRenderer(BaseRenderer):
"""Get list of supported output formats."""
return ["pptx", "ppt"]
async def render(self, content: str, title: str = "Generated Presentation", **kwargs) -> Tuple[str, str]:
async def render(self, extracted_content: Dict[str, Any], title: str, user_prompt: str = None, ai_service=None) -> Tuple[str, str]:
"""
Render content as PowerPoint presentation.
Render content as PowerPoint presentation from JSON data.
Args:
content: Content to render as presentation
extracted_content: JSON content to render as presentation
title: Title for the presentation
user_prompt: User prompt for AI styling
ai_service: AI service for styling
**kwargs: Additional rendering options
Returns:
@ -43,16 +45,24 @@ class PptxRenderer(BaseRenderer):
# Create new presentation
prs = Presentation()
# Set slide size (16:9)
prs.slide_width = Inches(13.33)
prs.slide_height = Inches(7.5)
# Set slide size based on user intent (default to 16:9)
slide_size = styles.get("slide_size", "16:9")
if slide_size == "4:3":
prs.slide_width = Inches(10)
prs.slide_height = Inches(7.5)
else: # Default to 16:9
prs.slide_width = Inches(13.33)
prs.slide_height = Inches(7.5)
# Parse content into slides
slides_data = self._parse_content_to_slides(content, title)
logger.info(f"Parsed {len(slides_data)} slides from content")
# Get AI-generated styling definitions
styles = await self._get_pptx_styles(user_prompt, ai_service)
# Generate slides from JSON content
slides_data = await self._parse_json_to_slides(extracted_content, title, styles)
logger.info(f"Parsed {len(slides_data)} slides from JSON content")
# Debug: Show first 200 chars of content
logger.info(f"Content preview: '{content[:200]}...'")
logger.info(f"JSON content preview: {str(extracted_content)[:200]}...")
for i, slide_data in enumerate(slides_data):
logger.info(f"Slide {i+1}: '{slide_data.get('title', 'No title')}' - {len(slide_data.get('content', ''))} chars")
@ -63,8 +73,9 @@ class PptxRenderer(BaseRenderer):
else:
logger.warning(f" ⚠️ Slide {i+1} has NO content!")
# Create slide with title and content layout
slide_layout = prs.slide_layouts[1] # Title and Content layout
# Create slide with appropriate layout based on content
slide_layout_index = self._get_slide_layout_index(slide_data, styles)
slide_layout = prs.slide_layouts[slide_layout_index]
slide = prs.slides.add_slide(slide_layout)
# Set title
@ -247,6 +258,446 @@ class PptxRenderer(BaseRenderer):
"""Get MIME type for rendered output."""
return self.output_mime_type
def getExtractionPrompt(self) -> str:
"""Get extraction prompt for this renderer."""
return "Extract content for PowerPoint presentation generation"
def getExtractionPrompt(self, user_prompt: str, title: str) -> str:
"""Return only PowerPoint-specific guidelines; global prompt is built centrally."""
return (
"POWERPOINT FORMAT GUIDELINES:\n"
"- Extract structured data from source documents into JSON format\n"
"- Focus on presentation-ready content with clear sections and visual elements\n"
"- For tables: Extract headers and rows as separate arrays suitable for slides\n"
"- For lists: Extract items with optional sub-items for bullet points\n"
"- Structure content into sections with clear content types (heading, paragraph, table, list)\n"
"- Use proper JSON structure with metadata, sections, and elements\n"
"- Ensure content is concise and suitable for slide presentation\n"
"OUTPUT: Return structured JSON that can be converted to PowerPoint slides."
)
async def _get_pptx_styles(self, user_prompt: str, ai_service=None) -> Dict[str, Any]:
"""Simple AI call to get PowerPoint styling definitions."""
if not ai_service:
return self._get_default_pptx_styles()
try:
prompt = f"""
For this PowerPoint presentation request: "{user_prompt}"
Provide styling definitions for PowerPoint elements. Respond with ONLY JSON:
{{
"title": {{"font_size": 44, "color": "#1F4E79", "bold": true, "align": "center"}},
"heading": {{"font_size": 32, "color": "#2F2F2F", "bold": true, "align": "left"}},
"subheading": {{"font_size": 24, "color": "#4F4F4F", "bold": true, "align": "left"}},
"paragraph": {{"font_size": 18, "color": "#2F2F2F", "bold": false, "align": "left"}},
"bullet_list": {{"font_size": 18, "color": "#2F2F2F", "indent": 20}},
"table_header": {{"font_size": 16, "color": "#FFFFFF", "bold": true, "background": "#4F4F4F"}},
"table_cell": {{"font_size": 14, "color": "#2F2F2F", "bold": false, "background": "#FFFFFF"}},
"slide_size": "16:9",
"content_per_slide": "concise"
}}
CRITICAL: PowerPoint text must be large enough to read from a distance. Minimum font size should be 14pt for body text.
"""
from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, OperationType
request_options = AiCallOptions()
request_options.operationType = OperationType.GENERAL
request = AiCallRequest(prompt=prompt, context="", options=request_options)
response = await ai_service.aiObjects.call(request)
import json
import re
# Clean and parse JSON
result = response.content.strip()
if result.startswith('```json'):
result = re.sub(r'^```json\s*', '', result)
result = re.sub(r'\s*```$', '', result)
elif result.startswith('```'):
result = re.sub(r'^```\s*', '', result)
result = re.sub(r'\s*```$', '', result)
styles = json.loads(result)
# Validate font sizes for PowerPoint readability
styles = self._validate_pptx_styles_readability(styles)
return styles
except Exception as e:
logger.warning(f"AI styling failed: {str(e)}, using defaults")
return self._get_default_pptx_styles()
def _validate_pptx_styles_readability(self, styles: Dict[str, Any]) -> Dict[str, Any]:
"""Validate and fix readability issues in AI-generated styles."""
try:
# Ensure minimum font sizes for PowerPoint readability
min_font_sizes = {
"title": 36,
"heading": 24,
"subheading": 20,
"paragraph": 14,
"bullet_list": 14,
"table_header": 12,
"table_cell": 12
}
for style_name, min_size in min_font_sizes.items():
if style_name in styles:
current_size = styles[style_name].get("font_size", 12)
if current_size < min_size:
styles[style_name]["font_size"] = min_size
return styles
except Exception as e:
logger.warning(f"Style validation failed: {str(e)}")
return self._get_default_pptx_styles()
def _get_default_pptx_styles(self) -> Dict[str, Any]:
"""Default PowerPoint styles."""
return {
"title": {"font_size": 44, "color": "#1F4E79", "bold": True, "align": "center"},
"heading": {"font_size": 32, "color": "#2F2F2F", "bold": True, "align": "left"},
"subheading": {"font_size": 24, "color": "#4F4F4F", "bold": True, "align": "left"},
"paragraph": {"font_size": 18, "color": "#2F2F2F", "bold": False, "align": "left"},
"bullet_list": {"font_size": 18, "color": "#2F2F2F", "indent": 20},
"table_header": {"font_size": 16, "color": "#FFFFFF", "bold": True, "background": "#4F4F4F"},
"table_cell": {"font_size": 14, "color": "#2F2F2F", "bold": False, "background": "#FFFFFF"},
"slide_size": "16:9",
"content_per_slide": "concise"
}
async def _parse_json_to_slides(self, json_content: Dict[str, Any], title: str, styles: Dict[str, Any]) -> List[Dict[str, Any]]:
"""
Parse JSON content into slide data structure.
Args:
json_content: JSON content to parse
title: Presentation title
styles: AI-generated styles
Returns:
List of slide data dictionaries
"""
slides = []
try:
# Validate JSON structure
if not isinstance(json_content, dict):
raise ValueError("JSON content must be a dictionary")
if "sections" not in json_content:
raise ValueError("JSON content must contain 'sections' field")
# Use title from JSON metadata if available, otherwise use provided title
document_title = json_content.get("metadata", {}).get("title", title)
# Create title slide
slides.append({
"title": document_title,
"content": "Generated by PowerOn AI System\n\n" + self._format_timestamp()
})
# Process sections into slides based on content and user intent
sections = json_content.get("sections", [])
slides.extend(self._create_slides_from_sections(sections, styles))
# If no content slides were created, create a default content slide
if len(slides) == 1: # Only title slide
slides.append({
"title": "Content Overview",
"content": "No structured content found in the source documents.\n\nPlease check the source documents and try again."
})
return slides
except Exception as e:
logger.error(f"Error parsing JSON to slides: {str(e)}")
# Return minimal fallback slides
return [
{
"title": title,
"content": "Error parsing content for presentation"
}
]
def _create_slide_from_section(self, section: Dict[str, Any], styles: Dict[str, Any]) -> Dict[str, Any]:
"""Create a slide from a JSON section."""
try:
section_title = section.get("title", "Untitled Section")
content_type = section.get("content_type", "paragraph")
elements = section.get("elements", [])
# Build slide content based on section type
content_parts = []
for element in elements:
if content_type == "table":
content_parts.append(self._format_table_for_slide(element))
elif content_type == "list":
content_parts.append(self._format_list_for_slide(element))
elif content_type == "heading":
content_parts.append(self._format_heading_for_slide(element))
elif content_type == "paragraph":
content_parts.append(self._format_paragraph_for_slide(element))
elif content_type == "code":
content_parts.append(self._format_code_for_slide(element))
else:
content_parts.append(self._format_paragraph_for_slide(element))
# Combine content parts
slide_content = "\n\n".join(filter(None, content_parts))
return {
"title": section_title,
"content": slide_content
}
except Exception as e:
logger.warning(f"Error creating slide from section: {str(e)}")
return None
def _format_table_for_slide(self, table_data: Dict[str, Any]) -> str:
"""Format table data for slide presentation."""
try:
headers = table_data.get("headers", [])
rows = table_data.get("rows", [])
if not headers:
return ""
# Create table representation
table_lines = []
# Add headers
header_line = " | ".join(str(h) for h in headers)
table_lines.append(header_line)
# Add separator
separator = "-" * len(header_line)
table_lines.append(separator)
# Add data rows (limit based on content density)
max_rows = 5 # Default limit
for row in rows[:max_rows]:
row_line = " | ".join(str(cell) for cell in row)
table_lines.append(row_line)
if len(rows) > max_rows:
table_lines.append(f"... and {len(rows) - max_rows} more rows")
return "\n".join(table_lines)
except Exception as e:
logger.warning(f"Error formatting table for slide: {str(e)}")
return ""
def _format_list_for_slide(self, list_data: Dict[str, Any]) -> str:
"""Format list data for slide presentation."""
try:
items = list_data.get("items", [])
if not items:
return ""
# Create list representation
list_lines = []
for item in items:
if isinstance(item, dict):
text = item.get("text", "")
list_lines.append(f"{text}")
# Add subitems (limit to 3 for readability)
subitems = item.get("subitems", [])[:3]
for subitem in subitems:
if isinstance(subitem, dict):
list_lines.append(f" - {subitem.get('text', '')}")
else:
list_lines.append(f" - {subitem}")
else:
list_lines.append(f"{str(item)}")
return "\n".join(list_lines)
except Exception as e:
logger.warning(f"Error formatting list for slide: {str(e)}")
return ""
def _format_heading_for_slide(self, heading_data: Dict[str, Any]) -> str:
"""Format heading data for slide presentation."""
try:
text = heading_data.get("text", "")
level = heading_data.get("level", 1)
if text:
return f"{'#' * level} {text}"
return ""
except Exception as e:
logger.warning(f"Error formatting heading for slide: {str(e)}")
return ""
def _format_paragraph_for_slide(self, paragraph_data: Dict[str, Any]) -> str:
"""Format paragraph data for slide presentation."""
try:
text = paragraph_data.get("text", "")
if text:
# Limit paragraph length based on content density
max_length = 200 # Default limit
if len(text) > max_length:
text = text[:max_length] + "..."
return text
return ""
except Exception as e:
logger.warning(f"Error formatting paragraph for slide: {str(e)}")
return ""
def _format_code_for_slide(self, code_data: Dict[str, Any]) -> str:
"""Format code data for slide presentation."""
try:
code = code_data.get("code", "")
language = code_data.get("language", "")
if code:
# Limit code length based on content density
max_length = 100 # Default limit
if len(code) > max_length:
code = code[:max_length] + "..."
if language:
return f"Code ({language}):\n{code}"
else:
return f"Code:\n{code}"
return ""
except Exception as e:
logger.warning(f"Error formatting code for slide: {str(e)}")
return ""
def _get_slide_layout_index(self, slide_data: Dict[str, Any], styles: Dict[str, Any]) -> int:
"""Determine the best slide layout based on content."""
try:
content = slide_data.get("content", "")
title = slide_data.get("title", "")
# Check if it's a title slide (first slide)
if not content or "Generated by PowerOn AI System" in content:
return 0 # Title slide layout
# Check content type to determine layout
if "|" in content and "-" in content:
# Has both tables and lists - use content with caption
return 2
elif "|" in content:
# Has tables - use content layout
return 1
elif content.count("") > 2:
# Has many bullet points - use content layout
return 1
else:
# Default to title and content
return 1
except Exception as e:
logger.warning(f"Error determining slide layout: {str(e)}")
return 1 # Default to title and content
def _create_slides_from_sections(self, sections: List[Dict[str, Any]], styles: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Create slides from sections based on content density and user intent."""
try:
slides = []
content_per_slide = styles.get("content_per_slide", "concise")
for section in sections:
section_slides = self._create_section_slides(section, styles, content_per_slide)
slides.extend(section_slides)
return slides
except Exception as e:
logger.warning(f"Error creating slides from sections: {str(e)}")
return []
def _create_section_slides(self, section: Dict[str, Any], styles: Dict[str, Any], content_per_slide: str) -> List[Dict[str, Any]]:
"""Create one or more slides from a section based on content density."""
try:
section_title = section.get("title", "Untitled Section")
content_type = section.get("content_type", "paragraph")
elements = section.get("elements", [])
if not elements:
return [{
"title": section_title,
"content": "No content available for this section."
}]
# Determine how to split content based on type and density
if content_per_slide == "detailed" and len(elements) > 3:
# Split large sections into multiple slides
return self._split_section_into_multiple_slides(section_title, elements, content_type)
else:
# Create single slide for section
slide_data = self._create_slide_from_section(section, styles)
return [slide_data] if slide_data else []
except Exception as e:
logger.warning(f"Error creating section slides: {str(e)}")
return []
def _split_section_into_multiple_slides(self, section_title: str, elements: List[Dict[str, Any]], content_type: str) -> List[Dict[str, Any]]:
"""Split a large section into multiple slides."""
try:
slides = []
max_elements_per_slide = 3
for i in range(0, len(elements), max_elements_per_slide):
slide_elements = elements[i:i + max_elements_per_slide]
# Create slide title
if i == 0:
slide_title = section_title
else:
slide_title = f"{section_title} (Part {i//max_elements_per_slide + 1})"
# Build content for this slide
content_parts = []
for element in slide_elements:
if content_type == "table":
content_parts.append(self._format_table_for_slide(element))
elif content_type == "list":
content_parts.append(self._format_list_for_slide(element))
elif content_type == "heading":
content_parts.append(self._format_heading_for_slide(element))
elif content_type == "paragraph":
content_parts.append(self._format_paragraph_for_slide(element))
elif content_type == "code":
content_parts.append(self._format_code_for_slide(element))
else:
content_parts.append(self._format_paragraph_for_slide(element))
slide_content = "\n\n".join(filter(None, content_parts))
slides.append({
"title": slide_title,
"content": slide_content
})
return slides
except Exception as e:
logger.warning(f"Error splitting section into slides: {str(e)}")
return []
def _format_timestamp(self) -> str:
"""Format current timestamp for presentation generation."""
from datetime import datetime, UTC
return datetime.now(UTC).strftime("%Y-%m-%d %H:%M:%S UTC")

View file

@ -0,0 +1,270 @@
"""
JSON Schema definitions for AI-generated document structures.
This module provides schemas that guide AI to generate structured JSON output.
"""
from typing import Dict, Any
def get_document_subJsonSchema() -> Dict[str, Any]:
"""Get the JSON schema for structured document generation."""
return {
"type": "object",
"required": ["metadata", "sections"],
"properties": {
"metadata": {
"type": "object",
"required": ["title"],
"properties": {
"title": {"type": "string", "description": "Document title"},
"author": {"type": "string", "description": "Document author (optional)"},
"source_documents": {
"type": "array",
"items": {"type": "string"},
"description": "List of source document IDs"
},
"extraction_method": {
"type": "string",
"default": "ai_extraction",
"description": "Method used for extraction"
}
}
},
"sections": {
"type": "array",
"description": "Document sections containing structured content",
"items": {
"type": "object",
"required": ["id", "content_type", "elements", "order"],
"properties": {
"id": {"type": "string", "description": "Unique section identifier"},
"title": {"type": "string", "description": "Section title (optional)"},
"content_type": {
"type": "string",
"enum": ["table", "list", "paragraph", "heading", "code", "image", "mixed"],
"description": "Primary content type of this section"
},
"elements": {
"type": "array",
"description": "Content elements in this section",
"items": {
"oneOf": [
{"$ref": "#/definitions/table"},
{"$ref": "#/definitions/bullet_list"},
{"$ref": "#/definitions/paragraph"},
{"$ref": "#/definitions/heading"},
{"$ref": "#/definitions/code_block"}
]
}
},
"order": {"type": "integer", "description": "Section order in document"},
"metadata": {
"type": "object",
"description": "Additional section metadata"
}
}
}
},
"summary": {
"type": "string",
"description": "Document summary (optional)"
},
"tags": {
"type": "array",
"items": {"type": "string"},
"description": "Document tags for categorization"
}
},
"definitions": {
"table": {
"type": "object",
"required": ["headers", "rows"],
"properties": {
"headers": {
"type": "array",
"items": {"type": "string"},
"description": "Table column headers"
},
"rows": {
"type": "array",
"items": {
"type": "array",
"items": {"type": "string"}
},
"description": "Table data rows"
},
"caption": {
"type": "string",
"description": "Table caption (optional)"
}
}
},
"bullet_list": {
"type": "object",
"required": ["items"],
"properties": {
"items": {
"type": "array",
"items": {
"type": "object",
"required": ["text"],
"properties": {
"text": {"type": "string", "description": "List item text"},
"subitems": {
"type": "array",
"items": {"$ref": "#/definitions/list_item"},
"description": "Nested sub-items (optional)"
}
}
},
"description": "List items"
},
"list_type": {
"type": "string",
"enum": ["bullet", "numbered", "checklist"],
"default": "bullet",
"description": "Type of list"
}
}
},
"list_item": {
"type": "object",
"required": ["text"],
"properties": {
"text": {"type": "string", "description": "List item text"},
"subitems": {
"type": "array",
"items": {"$ref": "#/definitions/list_item"},
"description": "Nested sub-items (optional)"
}
}
},
"paragraph": {
"type": "object",
"required": ["text"],
"properties": {
"text": {"type": "string", "description": "Paragraph text"},
"formatting": {
"type": "object",
"description": "Text formatting (bold, italic, etc.)"
}
}
},
"heading": {
"type": "object",
"required": ["text", "level"],
"properties": {
"text": {"type": "string", "description": "Heading text"},
"level": {
"type": "integer",
"minimum": 1,
"maximum": 6,
"description": "Heading level (1-6)"
}
}
},
"code_block": {
"type": "object",
"required": ["code"],
"properties": {
"code": {"type": "string", "description": "Code content"},
"language": {"type": "string", "description": "Programming language (optional)"}
}
}
}
}
def get_extraction_prompt_template() -> str:
"""Get the template for AI extraction prompts that request JSON output."""
return """
You are extracting structured content from documents. Your task is to analyze the provided content and generate a structured JSON document.
IMPORTANT: You must respond with valid JSON only. No additional text, explanations, or formatting outside the JSON structure.
JSON Schema Requirements:
- Extract the actual data from the source documents
- If content is a table, extract it as a table with headers and rows
- If content is a list, extract it as a structured list with items
- If content is text, extract it as paragraphs or headings
- Preserve the original structure and data - do not summarize or interpret
- Use the exact JSON schema provided
Content Types to Extract:
1. Tables: Extract all rows and columns with proper headers
2. Lists: Extract all items with proper nesting
3. Headings: Extract with appropriate levels
4. Paragraphs: Extract as structured text
5. Code: Extract code blocks with language identification
Return only the JSON structure following the schema. Do not include any text before or after the JSON.
"""
def get_generation_prompt_template() -> str:
"""Get the template for AI generation prompts that work with JSON input."""
return """
You are generating a document from structured JSON data. Your task is to create a well-formatted document based on the provided structured content.
IMPORTANT: You must respond with valid JSON only, following the document schema.
Generation Guidelines:
- Use the provided JSON structure as the foundation
- Enhance the content with proper formatting and organization
- Ensure logical flow and readability
- Maintain the original data integrity
- Add appropriate headings and sections
- Organize content in a logical sequence
Content Enhancement:
- Tables: Ensure proper headers and data alignment
- Lists: Use appropriate list types (bullet, numbered, checklist)
- Headings: Use appropriate heading levels for hierarchy
- Paragraphs: Ensure proper text flow and formatting
- Code: Preserve code blocks with proper language identification
Return only the enhanced JSON structure following the schema. Do not include any text before or after the JSON.
"""
def validate_json_document(json_data: Dict[str, Any]) -> bool:
"""Validate that the JSON data follows the document schema."""
try:
# Basic validation - check required fields
if not isinstance(json_data, dict):
return False
if "metadata" not in json_data or "sections" not in json_data:
return False
metadata = json_data["metadata"]
if not isinstance(metadata, dict) or "title" not in metadata:
return False
sections = json_data["sections"]
if not isinstance(sections, list):
return False
# Validate each section
for i, section in enumerate(sections):
if not isinstance(section, dict):
return False
required_fields = ["id", "content_type", "elements", "order"]
for field in required_fields:
if field not in section:
return False
# Validate content_type
valid_types = ["table", "list", "paragraph", "heading", "code", "image", "mixed"]
if section["content_type"] not in valid_types:
return False
# Validate elements
if not isinstance(section["elements"], list):
return False
return True
except Exception:
return False

View file

@ -0,0 +1,234 @@
"""
Centralized prompt builder for document generation across formats.
Builds a robust prompt that:
- Accepts any user intent (no fixed structure assumptions)
- Injects format-specific guidelines from the selected renderer
- Adds a common policy section to always use real data from source docs
- Requires the AI to output a filename header that we can parse and use
"""
from typing import Protocol
class _RendererLike(Protocol):
def getExtractionPrompt(self, user_prompt: str, title: str) -> str: # returns only format-specific guidelines
...
async def buildExtractionPrompt(
outputFormat: str,
renderer: _RendererLike,
userPrompt: str,
title: str,
aiService=None
) -> str:
"""
Build the final extraction prompt by combining:
- Parsed extraction intent from user prompt (using AI)
- Generic cross-format instructions (filename header + real-data policy)
- Format-specific guidelines snippet provided by the renderer
The AI must place a single filename header at the very top:
FILENAME: <safe-file-name-with-extension>
followed by a blank line and then ONLY the document content according to the target format.
"""
# Parse user prompt to separate extraction intent from generation format using AI
extractionIntent = await _parseExtractionIntent(userPrompt, outputFormat, aiService)
# Import JSON schema for structured output
from .subJsonSchema import get_document_subJsonSchema
jsonSchema = get_document_subJsonSchema()
# Generic block for JSON extraction
genericIntro = f"""
{extractionIntent}
You are extracting structured content from documents and must respond with valid JSON only.
IMPORTANT: You must respond with valid JSON only. No additional text, explanations, or formatting outside the JSON structure.
Extract the actual data from the source documents and structure it as JSON with this format:
{{
"metadata": {{
"title": "Document Title",
"version": "1.0"
}},
"sections": [
{{
"id": "section_1",
"type": "heading",
"data": {{
"level": 1,
"text": "Heading Text"
}}
}},
{{
"id": "section_2",
"type": "table",
"data": {{
"headers": ["Column1", "Column2"],
"rows": [["Data1", "Data2"], ["Data3", "Data4"]]
}}
}},
{{
"id": "section_3",
"type": "bullet_list",
"data": {{
"items": ["Item 1", "Item 2", "Item 3"]
}}
}},
{{
"id": "section_4",
"type": "paragraph",
"data": {{
"text": "Paragraph content here"
}}
}}
]
}}
Content Types to Extract:
1. Tables: Extract all rows and columns with proper headers
2. Lists: Extract all items with proper nesting
3. Headings: Extract with appropriate levels
4. Paragraphs: Extract as structured text
5. Code: Extract code blocks with language identification
Return only the JSON structure with actual data from the documents. Do not include any text before or after the JSON.
""".strip()
# Final assembly
finalPrompt = genericIntro
# Debug output
print(f"🔍 DEBUG: Extraction Prompt: {finalPrompt}")
print(f"🔍 DEBUG: Extraction Intent: {extractionIntent}")
return finalPrompt
async def buildGenerationPrompt(
outputFormat: str,
userPrompt: str,
title: str,
aiService=None
) -> str:
"""
Use AI to build the generation prompt based on user intent and format requirements.
Focus on what's important for the user and how to structure the content.
"""
if not aiService:
# Fallback if no AI service available
return f"Generate a comprehensive {outputFormat} document titled '{title}' based on the extracted content."
try:
# Protect userPrompt from injection
safeUserPrompt = userPrompt.replace('"', '\\"').replace("'", "\\'").replace('\n', ' ').replace('\r', ' ')
# AI call to generate the appropriate generation prompt
generationPromptRequest = f"""
Based on this user request, create a detailed generation prompt for creating a {outputFormat} document.
User request: "{safeUserPrompt}"
Document title: "{title}"
Output format: {outputFormat}
Create a generation prompt that:
1. Identifies what content is most important for the user
2. Specifies how to structure and organize the content
3. Includes any specific formatting or presentation requirements
4. Preserves any language requirements
5. Ensures the document meets the user's needs
IMPORTANT: Always generate content in STANDARDIZED JSON FORMAT. In your response, include the exact text "PLACEHOLDER_FOR_FORMAT_RULES" where specific format rules will be inserted afterwards automatically.
Return only the generation prompt, starting with "Generate a {outputFormat} document that..."
"""
# Call AI service to generate the prompt
print(f"🔍 DEBUG: Calling AI for generation prompt...")
result = await aiService.callAi(
prompt=generationPromptRequest,
documents=None,
options=None
)
print(f"🔍 DEBUG: AI generation prompt result: '{result}'")
# Replace the placeholder that the AI created with actual format rules
if result:
formatRules = _getFormatRules(outputFormat)
result = result.replace("PLACEHOLDER_FOR_FORMAT_RULES", formatRules)
# Debug output
print(f"🔍 DEBUG: Generation Prompt: {result if result else 'None'}")
return result if result else f"Generate a comprehensive {outputFormat} document titled '{title}' based on the extracted content. User requirements: {userPrompt}"
except Exception as e:
# Fallback on any error - preserve user prompt for language instructions
print(f"🔍 DEBUG: AI generation prompt failed: {str(e)}")
return f"Generate a comprehensive {outputFormat} document titled '{title}' based on the extracted content. User requirements: {userPrompt}"
def _getFormatRules(outputFormat: str) -> str:
"""
Get format-specific rules for JSON-based generation.
Since we now use standardized JSON, all formats follow the same rules.
"""
return """
- Generate content in standardized JSON format following the document schema
- Tables: Use JSON table format with headers and rows arrays
- Lists: Use JSON list format with items array
- Text: Use JSON paragraph format with text field
- Headings: Use JSON heading format with level field
- Structure: Follow the document JSON schema exactly
""".strip()
async def _parseExtractionIntent(userPrompt: str, outputFormat: str, aiService=None) -> str:
"""
Use AI to extract the core content intention from the user prompt.
Focus on WHAT the user wants to extract, not HOW to format it.
"""
if not aiService:
# Fallback if no AI service available
return "Extract all relevant content from the document according to the user's requirements"
try:
# Protect userPrompt from injection by escaping quotes and newlines
safeUserPrompt = userPrompt.replace('"', '\\"').replace("'", "\\'").replace('\n', ' ').replace('\r', ' ')
# Simple AI call to extract the intention
extractionPrompt = f"""
Extract the core content intention from this user request. Focus on WHAT raw data/content they want extracted.
User request: "{safeUserPrompt}"
Return only the content intention in a simple format like "Extract: [content description]"
Focus on extracting raw data, tables, lists, and factual content - NOT summaries or analysis.
If the user mentions a table, extract the actual table data with rows and columns.
If the user mentions a list, extract the actual list items.
IMPORTANT: Preserve any language requirements in your response.
Do not include formatting instructions, file types, or output methods.
"""
# Call AI service to extract intention
print(f"🔍 DEBUG: Calling AI for extraction intent...")
result = await aiService.callAi(
prompt=extractionPrompt,
documents=None,
options=None
)
print(f"🔍 DEBUG: AI extraction intent result: '{result}'")
return result if result else f"Extract all relevant content from the document according to the user's requirements: {userPrompt}"
except Exception as e:
# Fallback on any error - preserve user prompt for language instructions
print(f"🔍 DEBUG: AI extraction intent failed: {str(e)}")
return f"Extract all relevant content from the document according to the user's requirements: {userPrompt}"

197
rename_renderers.py Normal file
View file

@ -0,0 +1,197 @@
#!/usr/bin/env python3
"""
Script to rename renderer files from <name>_renderer.py to renderer<Name>.py
and update all references in the codebase.
"""
import os
import re
import shutil
from pathlib import Path
from typing import Dict, List, Tuple
def get_renderer_files(renderers_dir: Path) -> List[Tuple[str, str]]:
"""Get list of renderer files to rename."""
renderer_files = []
for file_path in renderers_dir.glob("*_renderer.py"):
if file_path.name not in ['base_renderer.py', 'registry.py']:
old_name = file_path.name
# Extract the name part (e.g., "csv" from "csv_renderer.py")
name_part = old_name.replace('_renderer.py', '')
# Create new name (e.g., "rendererCsv.py")
new_name = f"renderer{name_part.capitalize()}.py"
renderer_files.append((old_name, new_name))
return renderer_files
def update_file_imports(file_path: Path, old_to_new: Dict[str, str]) -> bool:
"""Update import statements in a file."""
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
original_content = content
changes_made = False
# Update import statements
for old_name, new_name in old_to_new.items():
old_module = old_name.replace('.py', '')
new_module = new_name.replace('.py', '')
# Pattern for from .old_module import
pattern1 = rf'from \.{re.escape(old_module)} import'
replacement1 = f'from .{new_module} import'
if re.search(pattern1, content):
content = re.sub(pattern1, replacement1, content)
changes_made = True
# Pattern for from modules.services.serviceGeneration.renderers.old_module import
pattern2 = rf'from modules\.services\.serviceGeneration\.renderers\.{re.escape(old_module)} import'
replacement2 = f'from modules.services.serviceGeneration.renderers.{new_module} import'
if re.search(pattern2, content):
content = re.sub(pattern2, replacement2, content)
changes_made = True
if changes_made:
with open(file_path, 'w', encoding='utf-8') as f:
f.write(content)
print(f"✅ Updated imports in: {file_path}")
return True
else:
print(f" No imports to update in: {file_path}")
return False
except Exception as e:
print(f"❌ Error updating {file_path}: {str(e)}")
return False
def update_class_names_in_file(file_path: Path, old_to_new: Dict[str, str]) -> bool:
"""Update class names in renderer files."""
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
original_content = content
changes_made = False
# Update class names
for old_name, new_name in old_to_new.items():
old_module = old_name.replace('.py', '')
new_module = new_name.replace('.py', '')
# Extract the name part for class name
name_part = old_module.replace('_renderer', '')
old_class = f"{name_part.capitalize()}Renderer"
new_class = f"Renderer{name_part.capitalize()}"
# Update class definition
pattern1 = rf'class {re.escape(old_class)}\('
replacement1 = f'class {new_class}('
if re.search(pattern1, content):
content = re.sub(pattern1, replacement1, content)
changes_made = True
# Update class instantiation
pattern2 = rf'{re.escape(old_class)}\('
replacement2 = f'{new_class}('
if re.search(pattern2, content):
content = re.sub(pattern2, replacement2, content)
changes_made = True
if changes_made:
with open(file_path, 'w', encoding='utf-8') as f:
f.write(content)
print(f"✅ Updated class names in: {file_path}")
return True
else:
print(f" No class names to update in: {file_path}")
return False
except Exception as e:
print(f"❌ Error updating class names in {file_path}: {str(e)}")
return False
def main():
"""Main function to rename renderer files and update references."""
print("🔄 Starting renderer file renaming process...")
# Get the gateway directory
gateway_dir = Path(__file__).parent
renderers_dir = gateway_dir / "modules" / "services" / "serviceGeneration" / "renderers"
if not renderers_dir.exists():
print(f"❌ Renderers directory not found: {renderers_dir}")
return
print(f"📁 Working in directory: {renderers_dir}")
# Get list of files to rename
renderer_files = get_renderer_files(renderers_dir)
if not renderer_files:
print(" No renderer files found to rename.")
return
print(f"📋 Found {len(renderer_files)} renderer files to rename:")
for old_name, new_name in renderer_files:
print(f" {old_name}{new_name}")
# Create mapping dictionary
old_to_new = {old_name: new_name for old_name, new_name in renderer_files}
# Step 1: Update imports in all Python files
print("\n🔄 Step 1: Updating import statements...")
updated_files = []
# Search in gateway directory
for py_file in gateway_dir.rglob("*.py"):
if py_file.name != "rename_renderers.py": # Skip this script
if update_file_imports(py_file, old_to_new):
updated_files.append(py_file)
print(f"✅ Updated imports in {len(updated_files)} files")
# Step 2: Update class names in renderer files
print("\n🔄 Step 2: Updating class names in renderer files...")
class_updated_files = []
for old_name, new_name in renderer_files:
old_file_path = renderers_dir / old_name
if old_file_path.exists():
if update_class_names_in_file(old_file_path, old_to_new):
class_updated_files.append(old_file_path)
print(f"✅ Updated class names in {len(class_updated_files)} files")
# Step 3: Rename the files
print("\n🔄 Step 3: Renaming files...")
renamed_files = []
for old_name, new_name in renderer_files:
old_file_path = renderers_dir / old_name
new_file_path = renderers_dir / new_name
if old_file_path.exists():
try:
shutil.move(str(old_file_path), str(new_file_path))
renamed_files.append((old_name, new_name))
print(f"✅ Renamed: {old_name}{new_name}")
except Exception as e:
print(f"❌ Error renaming {old_name}: {str(e)}")
else:
print(f"⚠️ File not found: {old_name}")
print(f"\n🎉 Renaming process completed!")
print(f"📊 Summary:")
print(f" - Files renamed: {len(renamed_files)}")
print(f" - Import statements updated: {len(updated_files)}")
print(f" - Class names updated: {len(class_updated_files)}")
if renamed_files:
print(f"\n📋 Renamed files:")
for old_name, new_name in renamed_files:
print(f"{old_name}{new_name}")
if __name__ == "__main__":
main()

View file

@ -148,31 +148,15 @@ async def process_documents_and_generate_summary():
# Call the main AI service directly - let it handle everything including DOCX generation
logger.info("🤖 Calling main AI service with intelligent merging...")
# Test different AI operations end-to-end
test_prompts = [
{
"name": "Document Analysis",
"prompt": "Analyze these documents and provide a comprehensive summary of their content, key points, and important information.",
"outputFormat": None # Text response
},
{
"name": "DOCX Generation",
"prompt": "Create a professional DOCX document summarizing the key information from these documents.",
"outputFormat": "docx"
},
{
"name": "PDF Generation",
"prompt": "Generate a PDF report analyzing these documents with sections for each document type.",
"outputFormat": "pdf"
}
]
# Run a single end-to-end test to avoid the loop issue
logger.info("🧪 Running single end-to-end test...")
# userPrompt = "Analyze these documents and create a comprehensive DOCX summary document including: 1) Document types and purposes, 2) Key information and main points, 3) Important details and numbers, 4) Notable sections, 5) Overall assessment and recommendations."
userPrompt = "Create a docx file containing a summary and the COMPLETE list from the pdf file, having one additional column with a 'x' marker for all items, which are yellow highlighted."
# userPrompt = "Create a docx file containing a summary and the COMPLETE list from the pdf file, having one additional column with a 'x' marker for all items, which are yellow highlighted."
userPrompt = "Create a docx file containing the combined documents in french language."
try:
# Single AI call with DOCX generation
@ -308,7 +292,12 @@ async def process_documents_and_generate_summary():
if content:
text_path = output_dir / f"{test_name}_content_{timestamp}.txt"
with open(text_path, 'w', encoding='utf-8') as f:
f.write(content)
# Handle both string and dictionary content
if isinstance(content, dict):
import json
f.write(json.dumps(content, indent=2, ensure_ascii=False))
else:
f.write(str(content))
logger.info(f"✅ Content saved: {text_path}")
elif isinstance(response, str):

View file

@ -0,0 +1,77 @@
#!/usr/bin/env python3
"""
Test script to verify the fallback mechanism in interfaceAiObjects.py
"""
import asyncio
import sys
import os
import logging
from pathlib import Path
# Add the gateway directory to the Python path
gateway_dir = Path(__file__).parent
sys.path.insert(0, str(gateway_dir))
# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
async def test_fallback_mechanism():
"""Test the fallback mechanism by simulating a failing primary model."""
try:
from modules.interfaces.interfaceAiObjects import AiObjects
from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, OperationType
logger.info("🧪 Testing fallback mechanism...")
# Create AiObjects instance
ai_objects = await AiObjects.create()
logger.info("✅ AiObjects created successfully")
# Test 1: Normal operation (should work with primary model)
logger.info("📝 Test 1: Normal operation")
request = AiCallRequest(
prompt="Hello, this is a test prompt. Please respond with 'Test successful'.",
context="",
options=AiCallOptions(operationType=OperationType.GENERAL)
)
try:
response = await ai_objects.call(request)
logger.info(f"✅ Test 1 successful: {response.modelName} - {response.content[:50]}...")
except Exception as e:
logger.warning(f"⚠️ Test 1 failed: {str(e)}")
# Test 2: Image analysis fallback
logger.info("🖼️ Test 2: Image analysis fallback")
try:
# Create a dummy image data (base64 encoded 1x1 pixel)
dummy_image = "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR42mNkYPhfDwAChwGA60e6kgAAAABJRU5ErkJggg=="
result = await ai_objects.callImage(
prompt="Describe this image",
imageData=dummy_image,
mimeType="image/png",
options=AiCallOptions(operationType=OperationType.IMAGE_ANALYSIS)
)
logger.info(f"✅ Test 2 successful: {result[:50]}...")
except Exception as e:
logger.warning(f"⚠️ Test 2 failed: {str(e)}")
# Test 3: Test fallback model selection
logger.info("🔄 Test 3: Fallback model selection")
fallback_models = ai_objects._getFallbackModels(OperationType.GENERAL)
logger.info(f"✅ Fallback models for GENERAL: {fallback_models}")
fallback_models_image = ai_objects._getFallbackModels(OperationType.IMAGE_ANALYSIS)
logger.info(f"✅ Fallback models for IMAGE_ANALYSIS: {fallback_models_image}")
logger.info("🎉 Fallback mechanism test completed!")
except Exception as e:
logger.error(f"❌ Test failed: {str(e)}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
asyncio.run(test_fallback_mechanism())

BIN
test_json_to_docx.docx Normal file

Binary file not shown.

120
test_json_to_docx.py Normal file
View file

@ -0,0 +1,120 @@
#!/usr/bin/env python3
"""
Test script for JSON-to-DOCX rendering pipeline.
"""
import asyncio
import json
import sys
import os
from modules.services.serviceGeneration.renderers.docx_renderer import DocxRenderer
async def test_json_to_docx():
"""Test the JSON-to-DOCX rendering pipeline."""
# Create test JSON document
test_json = {
"metadata": {
"title": "Test Document",
"version": "1.0"
},
"sections": [
{
"id": "heading1",
"type": "heading",
"data": {
"level": 1,
"text": "Document Overview"
}
},
{
"id": "paragraph1",
"type": "paragraph",
"data": {
"text": "This is a test paragraph to verify JSON-to-DOCX rendering works correctly."
}
},
{
"id": "table1",
"type": "table",
"data": {
"headers": ["Name", "Quantity", "Status"],
"rows": [
["Item 1", "5", "Active"],
["Item 2", "3", "Inactive"],
["Item 3", "10", "Active"]
]
}
},
{
"id": "list1",
"type": "bullet_list",
"data": {
"items": [
"First bullet point",
"Second bullet point",
"Third bullet point"
]
}
},
{
"id": "heading2",
"type": "heading",
"data": {
"level": 2,
"text": "Summary"
}
},
{
"id": "paragraph2",
"type": "paragraph",
"data": {
"text": "This document demonstrates the new JSON-based rendering system."
}
}
]
}
print("🧪 Testing JSON-to-DOCX rendering...")
print(f"📄 Test document has {len(test_json['sections'])} sections")
try:
# Create renderer
renderer = DocxRenderer()
# Test rendering
docx_content, mime_type = await renderer.render(
extracted_content=test_json,
title="Test Document",
user_prompt="Create a test document"
)
print(f"✅ Rendering successful!")
print(f"📊 MIME type: {mime_type}")
print(f"📏 Content length: {len(docx_content)} characters")
print(f"🔍 Content preview: {docx_content[:100]}...")
# Save test file
import base64
docx_bytes = base64.b64decode(docx_content)
with open("test_json_to_docx.docx", "wb") as f:
f.write(docx_bytes)
print(f"💾 Test DOCX saved as: test_json_to_docx.docx")
return True
except Exception as e:
print(f"❌ Rendering failed: {str(e)}")
import traceback
traceback.print_exc()
return False
if __name__ == "__main__":
success = asyncio.run(test_json_to_docx())
if success:
print("\n🎉 JSON-to-DOCX rendering test PASSED!")
else:
print("\n💥 JSON-to-DOCX rendering test FAILED!")
sys.exit(1)