488 lines
21 KiB
Python
488 lines
21 KiB
Python
# Copyright (c) 2025 Patrick Motsch
|
|
# All rights reserved.
|
|
"""
|
|
Structure Generator for hierarchical document generation.
|
|
Generates document skeleton with section placeholders.
|
|
"""
|
|
|
|
import logging
|
|
import json
|
|
from typing import Dict, Any, Optional, List
|
|
from modules.datamodels.datamodelJson import jsonTemplateDocument
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class StructureGenerator:
|
|
"""Generates document structure with section placeholders"""
|
|
|
|
def __init__(self, services: Any):
|
|
self.services = services
|
|
|
|
async def generateStructure(
|
|
self,
|
|
userPrompt: str,
|
|
documentList: Optional[Any] = None,
|
|
cachedContent: Optional[Dict[str, Any]] = None,
|
|
maxSectionLength: int = 500,
|
|
existingImages: Optional[List[Dict[str, Any]]] = None
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Generate document structure with sections.
|
|
|
|
Args:
|
|
userPrompt: User's original prompt
|
|
documentList: Optional document references
|
|
cachedContent: Optional extracted content cache
|
|
maxSectionLength: Maximum words for simple sections
|
|
existingImages: Optional list of existing images to include
|
|
|
|
Returns:
|
|
Document structure with empty elements arrays
|
|
"""
|
|
try:
|
|
# Create structure generation prompt
|
|
structurePrompt = self._createStructurePrompt(
|
|
userPrompt=userPrompt,
|
|
cachedContent=cachedContent,
|
|
maxSectionLength=maxSectionLength,
|
|
existingImages=existingImages or []
|
|
)
|
|
|
|
# Debug: Log structure generation prompt
|
|
if self.services and hasattr(self.services, 'utils') and hasattr(self.services.utils, 'writeDebugFile'):
|
|
try:
|
|
self.services.utils.writeDebugFile(
|
|
structurePrompt,
|
|
"document_generation_structure_prompt"
|
|
)
|
|
except Exception as e:
|
|
logger.debug(f"Could not write debug file for structure prompt: {e}")
|
|
|
|
# Call AI to generate structure
|
|
from modules.datamodels.datamodelAi import AiCallOptions, OperationTypeEnum
|
|
|
|
options = AiCallOptions(
|
|
operationType=OperationTypeEnum.DATA_GENERATE,
|
|
resultFormat="json"
|
|
)
|
|
|
|
aiResponse = await self.services.ai.callAiContent(
|
|
prompt=structurePrompt,
|
|
options=options,
|
|
outputFormat="json"
|
|
)
|
|
|
|
# Debug: Log structure generation response
|
|
if self.services and hasattr(self.services, 'utils') and hasattr(self.services.utils, 'writeDebugFile'):
|
|
try:
|
|
self.services.utils.writeDebugFile(
|
|
aiResponse.content if aiResponse and aiResponse.content else '',
|
|
"document_generation_structure_response"
|
|
)
|
|
except Exception as e:
|
|
logger.debug(f"Could not write debug file for structure response: {e}")
|
|
|
|
if not aiResponse or not aiResponse.content:
|
|
raise ValueError("AI structure generation returned empty response")
|
|
|
|
# Extract and parse JSON
|
|
extractedJson = self.services.utils.jsonExtractString(aiResponse.content)
|
|
if not extractedJson:
|
|
raise ValueError("No JSON found in AI structure response")
|
|
|
|
structure = json.loads(extractedJson)
|
|
|
|
# Validate and enhance structure
|
|
structure = self._validateAndEnhanceStructure(structure, maxSectionLength)
|
|
|
|
return structure
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error generating structure: {str(e)}")
|
|
raise
|
|
|
|
def _createStructurePrompt(
|
|
self,
|
|
userPrompt: str,
|
|
cachedContent: Optional[Dict[str, Any]] = None,
|
|
maxSectionLength: int = 500,
|
|
existingImages: Optional[List[Dict[str, Any]]] = None
|
|
) -> str:
|
|
"""
|
|
Create prompt for structure generation.
|
|
"""
|
|
# Get user language
|
|
userLanguage = self._getUserLanguage()
|
|
|
|
# Format cached content if available
|
|
cachedContentText = ""
|
|
if cachedContent and cachedContent.get("extractedContent"):
|
|
cachedContentText = self._formatCachedContent(cachedContent)
|
|
|
|
# Use provided existingImages or extract from cachedContent
|
|
if existingImages is None:
|
|
existingImages = []
|
|
if cachedContent and cachedContent.get("imageDocuments"):
|
|
existingImages = cachedContent.get("imageDocuments", [])
|
|
|
|
# Create structure template
|
|
structureTemplate = jsonTemplateDocument.replace("{{DOCUMENT_TITLE}}", "Document Title")
|
|
|
|
prompt = f"""{'='*80}
|
|
USER REQUEST:
|
|
{'='*80}
|
|
{userPrompt}
|
|
{'='*80}
|
|
|
|
TASK: Generate a document STRUCTURE (skeleton) with sections.
|
|
Do NOT generate actual content yet - only the structure.
|
|
|
|
{'='*80}
|
|
EXTRACTED CONTENT (if available):
|
|
{'='*80}
|
|
{cachedContentText if cachedContentText else "No source documents provided."}
|
|
{'='*80}
|
|
|
|
INSTRUCTIONS:
|
|
1. Analyze the user request and extracted content
|
|
2. Create a document structure with CONTENT sections only
|
|
3. For each section, specify:
|
|
- id: Unique identifier (e.g., "section_title_1", "section_image_1")
|
|
- content_type: "heading" | "paragraph" | "image" | "table" | "bullet_list" | "code_block"
|
|
- complexity: "simple" (can generate directly) or "complex" (needs sub-prompt)
|
|
- generation_hint: Brief description of what content should be generated
|
|
- image_prompt: (only for image sections) Detailed prompt for image generation
|
|
- order: Section order number (starting from 1)
|
|
- elements: [] (empty array - will be populated later)
|
|
|
|
4. Identify image sections:
|
|
- If user requests illustrations/images, create image sections
|
|
- If existing images are provided in documentList (check EXISTING IMAGES section below), create image sections that reference them
|
|
- Add image_prompt field with detailed description for image generation (only for new images)
|
|
- Set complexity to "complex"
|
|
- For existing images: Set image_source to "existing" and image_reference_id to the image document ID
|
|
- Example for new image: {{"id": "section_image_1", "content_type": "image", "complexity": "complex", "generation_hint": "Illustration for chapter 1", "image_prompt": "A detailed description for image generation", "order": 2, "elements": []}}
|
|
- Example for existing image: {{"id": "section_image_1", "content_type": "image", "complexity": "simple", "generation_hint": "Include provided image", "image_source": "existing", "image_reference_id": "doc_id_here", "order": 2, "elements": []}}
|
|
|
|
{'='*80}
|
|
EXISTING IMAGES (to include in document):
|
|
{'='*80}
|
|
{self._formatExistingImages(existingImages) if existingImages else "No existing images provided."}
|
|
{'='*80}
|
|
|
|
6. Identify complex text sections:
|
|
- Long chapters (>{maxSectionLength} words expected) should be marked as "complex"
|
|
- Short paragraphs/headings should be "simple"
|
|
|
|
7. Return ONLY valid JSON following this structure:
|
|
{structureTemplate}
|
|
|
|
5. CRITICAL RULES:
|
|
- Return ONLY valid JSON (no comments, no trailing commas, double quotes only)
|
|
- Follow the exact JSON schema structure provided
|
|
- IMPORTANT: All sections MUST have empty elements arrays: "elements": [] (the template shows examples with content, but you must use empty arrays)
|
|
- ALL sections MUST include "generation_hint" field with a brief description of what content should be generated
|
|
- ALL sections MUST include "complexity" field: "simple" for short content, "complex" for long chapters/images
|
|
- Image sections MUST include "image_prompt" field with detailed description for image generation
|
|
- Order numbers MUST start from 1 (not 0)
|
|
- All content must be in the language '{userLanguage}'
|
|
- Do NOT generate actual content - only structure (skeleton)
|
|
- Use only supported content_type values: "heading", "paragraph", "image", "table", "bullet_list", "code_block"
|
|
|
|
Return ONLY the JSON structure. No explanations.
|
|
"""
|
|
return prompt
|
|
|
|
def _validateAndEnhanceStructure(
|
|
self,
|
|
structure: Dict[str, Any],
|
|
maxSectionLength: int
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Validate structure and enhance with complexity identification.
|
|
"""
|
|
try:
|
|
# Ensure structure has required fields
|
|
if "documents" not in structure:
|
|
if "sections" in structure:
|
|
# Convert single-document format to multi-document format
|
|
structure = {
|
|
"metadata": structure.get("metadata", {}),
|
|
"documents": [{
|
|
"id": "doc_1",
|
|
"title": structure.get("metadata", {}).get("title", "Document"),
|
|
"filename": "document.json",
|
|
"sections": structure.get("sections", [])
|
|
}]
|
|
}
|
|
else:
|
|
raise ValueError("Structure missing 'documents' or 'sections' field")
|
|
|
|
# Process each document
|
|
for doc in structure.get("documents", []):
|
|
sections = doc.get("sections", [])
|
|
|
|
# Process and validate sections according to standardized schema
|
|
for idx, section in enumerate(sections):
|
|
# Ensure required fields
|
|
if "id" not in section:
|
|
section["id"] = f"section_{idx + 1}"
|
|
|
|
sectionId = section.get("id", "")
|
|
section["order"] = idx + 1
|
|
|
|
if "elements" not in section:
|
|
section["elements"] = []
|
|
|
|
# Identify complexity if not set
|
|
if "complexity" not in section:
|
|
section["complexity"] = self._identifySectionComplexity(
|
|
section,
|
|
maxSectionLength
|
|
)
|
|
|
|
# Ensure generation_hint exists (required for content generation)
|
|
if "generation_hint" not in section or not section.get("generation_hint"):
|
|
# Create meaningful generation hint from section id or content type
|
|
contentType = section.get("content_type", "")
|
|
|
|
# Extract meaningful hint from section ID
|
|
meaningfulHint = self._extractMeaningfulHint(sectionId, contentType, section.get("elements", []))
|
|
section["generation_hint"] = meaningfulHint
|
|
|
|
# Ensure image sections have proper configuration
|
|
if section.get("content_type") == "image":
|
|
imageSource = section.get("image_source", "generate")
|
|
|
|
if imageSource == "existing":
|
|
# Existing image - ensure image_reference_id is set
|
|
if "image_reference_id" not in section:
|
|
logger.warning(f"Image section {sectionId} has image_source='existing' but no image_reference_id")
|
|
# Existing images are simple (no generation needed)
|
|
section["complexity"] = "simple"
|
|
else:
|
|
# New image generation - ensure image_prompt
|
|
if "image_prompt" not in section or not section.get("image_prompt"):
|
|
# Try to extract from generation_hint
|
|
generationHint = section.get("generation_hint", "")
|
|
if generationHint:
|
|
# Enhance generation_hint to be a proper image prompt
|
|
section["image_prompt"] = self._enhanceImagePrompt(generationHint)
|
|
else:
|
|
# Create default based on document context
|
|
docTitle = doc.get("title", "Document")
|
|
section["image_prompt"] = f"Generate an illustration for: {docTitle}"
|
|
|
|
# Ensure complexity is set to complex for new image generation
|
|
section["complexity"] = "complex"
|
|
|
|
return structure
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error validating structure: {str(e)}")
|
|
raise
|
|
|
|
def _identifySectionComplexity(
|
|
self,
|
|
section: Dict[str, Any],
|
|
maxSectionLength: int
|
|
) -> str:
|
|
"""
|
|
Identify if section is simple or complex.
|
|
|
|
Rules:
|
|
- Images: always complex
|
|
- Long chapters (>maxSectionLength words): complex
|
|
- Others: simple
|
|
"""
|
|
contentType = section.get("content_type", "")
|
|
|
|
# Images are always complex
|
|
if contentType == "image":
|
|
return "complex"
|
|
|
|
# Check generation_hint for length indicators
|
|
generationHint = section.get("generation_hint", "").lower()
|
|
|
|
# Keywords indicating long content
|
|
longContentKeywords = [
|
|
"chapter", "long", "detailed", "comprehensive",
|
|
"extensive", "full", "complete story"
|
|
]
|
|
|
|
if any(keyword in generationHint for keyword in longContentKeywords):
|
|
return "complex"
|
|
|
|
# Default to simple
|
|
return "simple"
|
|
|
|
def _extractMeaningfulHint(
|
|
self,
|
|
sectionId: str,
|
|
contentType: str,
|
|
elements: List[Any]
|
|
) -> str:
|
|
"""
|
|
Extract meaningful generation hint from section ID, content type, or elements.
|
|
|
|
Args:
|
|
sectionId: Section identifier (e.g., "section_heading_current_state")
|
|
contentType: Content type (e.g., "heading", "paragraph")
|
|
elements: Existing elements if any
|
|
|
|
Returns:
|
|
Meaningful generation hint string
|
|
"""
|
|
sectionIdLower = sectionId.lower()
|
|
|
|
# Try to extract text from existing elements first (most accurate)
|
|
if elements and isinstance(elements, list) and len(elements) > 0:
|
|
firstElement = elements[0]
|
|
if isinstance(firstElement, dict):
|
|
if "text" in firstElement and firstElement["text"]:
|
|
if contentType == "heading":
|
|
return firstElement["text"]
|
|
elif contentType == "paragraph":
|
|
return f"Content paragraph: {firstElement['text'][:50]}..."
|
|
|
|
# Extract meaningful text from section ID
|
|
# Remove common prefixes: "section_", "section_heading_", "section_paragraph_", etc.
|
|
meaningfulPart = sectionId
|
|
for prefix in ["section_heading_", "section_paragraph_", "section_bullet_list_",
|
|
"section_code_block_", "section_image_", "section_"]:
|
|
if meaningfulPart.lower().startswith(prefix):
|
|
meaningfulPart = meaningfulPart[len(prefix):]
|
|
break
|
|
|
|
# Convert snake_case to Title Case
|
|
# e.g., "current_state" -> "Current State"
|
|
words = meaningfulPart.replace("_", " ").split()
|
|
titleCase = " ".join(word.capitalize() for word in words if word)
|
|
|
|
# Handle special cases
|
|
if "introduction" in sectionIdLower or "intro" in sectionIdLower:
|
|
return "Introduction paragraph"
|
|
elif "conclusion" in sectionIdLower:
|
|
return "Conclusion paragraph"
|
|
elif "footer" in sectionIdLower or "copyright" in sectionIdLower:
|
|
return "Footer content"
|
|
elif "title" in sectionIdLower and "main" in sectionIdLower:
|
|
# Main title - try to get from document title or use generic
|
|
return "Main document title"
|
|
|
|
# Create hint based on content type and extracted text
|
|
if contentType == "heading":
|
|
if titleCase:
|
|
return titleCase
|
|
else:
|
|
return "Section heading"
|
|
elif contentType == "paragraph":
|
|
if titleCase:
|
|
return f"Content paragraph about {titleCase.lower()}"
|
|
else:
|
|
return f"Content paragraph"
|
|
elif contentType == "bullet_list":
|
|
if titleCase:
|
|
return f"Bullet list: {titleCase.lower()}"
|
|
else:
|
|
return "Bullet list items"
|
|
elif contentType == "code_block":
|
|
return "Code content"
|
|
else:
|
|
if titleCase:
|
|
return f"Content for {titleCase.lower()}"
|
|
else:
|
|
return f"Content for {contentType} section"
|
|
|
|
def _extractImagePrompts(
|
|
self,
|
|
structure: Dict[str, Any]
|
|
) -> Dict[str, str]:
|
|
"""
|
|
Extract image generation prompts from structure.
|
|
Maps section_id -> image_prompt
|
|
"""
|
|
imagePrompts = {}
|
|
|
|
for doc in structure.get("documents", []):
|
|
for section in doc.get("sections", []):
|
|
if section.get("content_type") == "image":
|
|
sectionId = section.get("id")
|
|
imagePrompt = section.get("image_prompt")
|
|
if sectionId and imagePrompt:
|
|
imagePrompts[sectionId] = imagePrompt
|
|
|
|
return imagePrompts
|
|
|
|
def _formatCachedContent(
|
|
self,
|
|
cachedContent: Dict[str, Any]
|
|
) -> str:
|
|
"""
|
|
Format cached content for prompt inclusion.
|
|
"""
|
|
try:
|
|
extractedContent = cachedContent.get("extractedContent", [])
|
|
if not extractedContent:
|
|
return "No content extracted."
|
|
|
|
# Format ContentPart objects
|
|
formattedParts = []
|
|
for extracted in extractedContent:
|
|
if hasattr(extracted, 'parts'):
|
|
for part in extracted.parts:
|
|
if hasattr(part, 'content'):
|
|
formattedParts.append(part.content)
|
|
elif isinstance(extracted, dict):
|
|
formattedParts.append(str(extracted))
|
|
else:
|
|
formattedParts.append(str(extracted))
|
|
|
|
return "\n\n".join(formattedParts) if formattedParts else "No content extracted."
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Error formatting cached content: {str(e)}")
|
|
return "Error formatting cached content."
|
|
|
|
def _enhanceImagePrompt(self, generationHint: str) -> str:
|
|
"""
|
|
Enhance generation hint to be a proper image generation prompt.
|
|
Adds visual details and style guidance if missing.
|
|
"""
|
|
# If hint already contains visual details, use as-is
|
|
visualKeywords = ["illustration", "image", "picture", "visual", "depict", "show", "drawing"]
|
|
if any(keyword.lower() in generationHint.lower() for keyword in visualKeywords):
|
|
return generationHint
|
|
|
|
# Enhance with visual description
|
|
enhanced = f"Create a professional illustration: {generationHint}"
|
|
return enhanced
|
|
|
|
def _formatExistingImages(self, imageDocuments: List[Dict[str, Any]]) -> str:
|
|
"""Format existing images list for prompt inclusion"""
|
|
if not imageDocuments:
|
|
return "No existing images provided."
|
|
|
|
formatted = []
|
|
for i, imgDoc in enumerate(imageDocuments, 1):
|
|
formatted.append(f"{i}. Image ID: {imgDoc.get('id')}")
|
|
formatted.append(f" File Name: {imgDoc.get('fileName', 'Unknown')}")
|
|
formatted.append(f" MIME Type: {imgDoc.get('mimeType', 'Unknown')}")
|
|
formatted.append(f" Alt Text: {imgDoc.get('altText', 'Image')}")
|
|
formatted.append("")
|
|
|
|
return "\n".join(formatted)
|
|
|
|
def _getUserLanguage(self) -> str:
|
|
"""Get user language for document generation"""
|
|
try:
|
|
if self.services:
|
|
if hasattr(self.services, 'currentUserLanguage') and self.services.currentUserLanguage:
|
|
return self.services.currentUserLanguage
|
|
elif hasattr(self.services, 'user') and self.services.user and hasattr(self.services.user, 'language'):
|
|
return self.services.user.language
|
|
except Exception:
|
|
pass
|
|
return 'en' # Default fallback
|
|
|