gateway/modules/services/serviceAi/subDocumentGeneration.py

539 lines
25 KiB
Python

import logging
from typing import Dict, Any, List, Optional, Tuple, Union
from modules.datamodels.datamodelChat import ChatDocument
from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, OperationType
logger = logging.getLogger(__name__)
class SubDocumentGeneration:
"""Document generation operations including single-file and multi-file generation."""
def __init__(self, services, aiObjects, documentProcessor):
"""Initialize document generation service.
Args:
services: Service center instance for accessing other services
aiObjects: Initialized AiObjects instance
documentProcessor: Document processing service instance
"""
self.services = services
self.aiObjects = aiObjects
self.documentProcessor = documentProcessor
async def callAiWithDocumentGeneration(
self,
prompt: str,
documents: Optional[List[ChatDocument]],
options: AiCallOptions,
outputFormat: str,
title: Optional[str]
) -> Dict[str, Any]:
"""
Handle AI calls with document generation in specific output format.
Now supports both single-file and multi-file generation.
Args:
prompt: The main prompt for the AI call
documents: Optional list of documents to process
options: AI call configuration options
outputFormat: Target output format (html, pdf, docx, txt, md, json, csv, xlsx)
title: Optional title for generated documents
Returns:
Dict with generated documents and metadata
"""
try:
# Use AI to analyze prompt intent
prompt_analysis = await self._analyzePromptIntent(prompt, self)
logger.info(f"Prompt analysis result: {prompt_analysis}")
if prompt_analysis.get("is_multi_file", False):
return await self._callAiWithMultiFileGeneration(
prompt, documents, options, outputFormat, title, prompt_analysis
)
else:
return await self._callAiWithSingleFileGeneration(
prompt, documents, options, outputFormat, title
)
except Exception as e:
logger.error(f"Error in document generation: {str(e)}")
return {
"success": False,
"error": str(e),
"content": "",
"rendered_content": "",
"mime_type": "text/plain",
"filename": f"error_{outputFormat}",
"format": outputFormat,
"title": title or "Error",
"documents": []
}
async def _callAiWithSingleFileGeneration(
self,
prompt: str,
documents: Optional[List[ChatDocument]],
options: AiCallOptions,
outputFormat: str,
title: Optional[str]
) -> Dict[str, Any]:
"""Handle single-file document generation (existing functionality)."""
try:
# Get format-specific extraction prompt from generation service
from modules.services.serviceGeneration.mainServiceGeneration import GenerationService
generation_service = GenerationService(self.services)
# Use default title if not provided
if not title:
title = "AI Generated Document"
# Get format-specific extraction prompt
extractionPrompt = await generation_service.getExtractionPrompt(
outputFormat=outputFormat,
userPrompt=prompt,
title=title,
aiService=self
)
# Process documents with format-specific prompt using JSON mode
# This ensures structured JSON output instead of text
aiResponseJson = await self._callAiJson(extractionPrompt, documents, options)
# Validate JSON response
if not isinstance(aiResponseJson, dict) or "sections" not in aiResponseJson:
raise Exception("AI response is not valid JSON document structure")
# Emit raw extracted data as a chat message attachment before rendering
try:
await self._postRawDataChatMessage(aiResponseJson, label="raw_extraction_single")
except Exception:
logger.warning("Failed to emit raw extraction chat message (single-file)")
# Generate filename from document metadata
parsedFilename = None
try:
if aiResponseJson.get("metadata", {}).get("title"):
title = aiResponseJson["metadata"]["title"]
# Clean title for filename
import re
parsed = re.sub(r"[^a-zA-Z0-9._-]", "-", title)
parsed = re.sub(r"-+", "-", parsed).strip('-')
if parsed:
parsedFilename = f"{parsed}.{outputFormat}"
except Exception:
parsedFilename = None
# Render the JSON content to the specified format
renderedContent, mimeType = await generation_service.renderReport(
extractedContent=aiResponseJson,
outputFormat=outputFormat,
title=title,
userPrompt=prompt,
aiService=self
)
# Generate meaningful filename (use AI-provided if valid, else fallback)
from datetime import datetime, UTC
timestamp = datetime.now(UTC).strftime("%Y%m%d-%H%M%S")
if parsedFilename and parsedFilename.lower().endswith(f".{outputFormat.lower()}"):
filename = parsedFilename
else:
safeTitle = ''.join(c if c.isalnum() else '-' for c in (title or 'document')).strip('-')
filename = f"{safeTitle or 'document'}-{timestamp}.{outputFormat}"
# Return structured result with document information
return {
"success": True,
"content": aiResponseJson, # Structured JSON document
"rendered_content": renderedContent, # Formatted content
"mime_type": mimeType,
"filename": filename,
"format": outputFormat,
"title": title,
"documents": [{
"documentName": filename,
"documentData": renderedContent,
"mimeType": mimeType
}],
"is_multi_file": False
}
except Exception as e:
logger.error(f"Error in single-file document generation: {str(e)}")
raise
async def _callAiWithMultiFileGeneration(
self,
prompt: str,
documents: Optional[List[ChatDocument]],
options: AiCallOptions,
outputFormat: str,
title: Optional[str],
prompt_analysis: Dict[str, Any]
) -> Dict[str, Any]:
"""Handle multi-file document generation using AI analysis."""
try:
# Get multi-file extraction prompt based on AI analysis
from modules.services.serviceGeneration.mainServiceGeneration import GenerationService
generation_service = GenerationService(self.services)
# Use default title if not provided
if not title:
title = "AI Generated Documents"
# Get adaptive extraction prompt
extraction_prompt = await generation_service.getAdaptiveExtractionPrompt(
outputFormat=outputFormat,
userPrompt=prompt,
title=title,
promptAnalysis=prompt_analysis,
aiService=self
)
logger.info(f"Adaptive extraction prompt length: {len(extraction_prompt)} characters")
logger.debug(f"Adaptive extraction prompt preview: {extraction_prompt[:500]}...")
# Process with adaptive JSON schema - use the existing pipeline but with adaptive prompt
logger.info(f"Using adaptive prompt with existing pipeline: {len(extraction_prompt)} chars")
logger.debug(f"Processing documents: {len(documents) if documents else 0} documents")
# Use the existing pipeline but replace the prompt with our adaptive one
# This ensures proper document processing while using the multi-file prompt
ai_response = await self.documentProcessor.processDocumentsPerChunkJsonWithPrompt(documents, extraction_prompt, options)
logger.info(f"AI response type: {type(ai_response)}")
logger.info(f"AI response keys: {list(ai_response.keys()) if isinstance(ai_response, dict) else 'Not a dict'}")
logger.debug(f"AI response preview: {str(ai_response)[:500]}...")
# Validate response structure
if not self._validateResponseStructure(ai_response, prompt_analysis):
# Fallback to single-file if multi-file fails
logger.warning(f"Multi-file processing failed - Invalid response structure. Expected multi-file but got: {list(ai_response.keys()) if isinstance(ai_response, dict) else type(ai_response)}")
logger.warning(f"Prompt analysis: {prompt_analysis}")
logger.warning("Falling back to single-file generation")
return await self._callAiWithSingleFileGeneration(
prompt, documents, options, outputFormat, title
)
# Emit raw extracted data as a chat message attachment before transformation/rendering
try:
await self._postRawDataChatMessage(ai_response, label="raw_extraction_multi")
except Exception:
logger.warning("Failed to emit raw extraction chat message (multi-file)")
# Process multiple documents
generated_documents = []
for i, doc_data in enumerate(ai_response.get("documents", [])):
# Transform AI-generated sections to renderer-compatible format
transformed_sections = []
for section in doc_data.get("sections", []):
# Convert AI format to renderer format
transformed_section = {
"id": section.get("id", f"section_{len(transformed_sections) + 1}"),
"type": section.get("content_type", "paragraph"),
"data": {
"text": "",
"elements": section.get("elements", [])
},
"order": section.get("order", len(transformed_sections) + 1)
}
# Extract text from elements for simple text-based sections
if section.get("content_type") in ["paragraph", "heading"]:
text_parts = []
for element in section.get("elements", []):
if "text" in element:
text_parts.append(element["text"])
transformed_section["data"]["text"] = "\n".join(text_parts)
transformed_sections.append(transformed_section)
# Create complete document structure for rendering
complete_document = {
"metadata": {
"title": doc_data["title"],
"source_document": "multi_file_generation",
"document_id": doc_data.get("id", f"doc_{i+1}"),
"filename": doc_data.get("filename", f"document_{i+1}"),
"split_strategy": prompt_analysis.get("strategy", "custom")
},
"sections": transformed_sections,
"summary": f"Generated document: {doc_data['title']}",
"tags": ["multi_file", "ai_generated"]
}
rendered_content, mime_type = await generation_service.renderReport(
extractedContent=complete_document,
outputFormat=outputFormat,
title=doc_data["title"],
userPrompt=prompt,
aiService=self
)
# Generate proper filename with correct extension
base_filename = doc_data.get("filename", f"document_{i+1}")
# Remove any existing extension and add the correct one
if '.' in base_filename:
base_filename = base_filename.rsplit('.', 1)[0]
# Add proper extension based on output format
if outputFormat.lower() == "docx":
filename = f"{base_filename}.docx"
elif outputFormat.lower() == "pdf":
filename = f"{base_filename}.pdf"
elif outputFormat.lower() == "html":
filename = f"{base_filename}.html"
else:
filename = f"{base_filename}.{outputFormat}"
generated_documents.append({
"documentName": filename,
"documentData": rendered_content,
"mimeType": mime_type
})
# Save debug files for multi-file generation - only if debug enabled
debug_enabled = self.services.utils.configGet("APP_DEBUG_CHAT_WORKFLOW_ENABLED", False)
if debug_enabled:
try:
import os
from datetime import datetime, UTC
ts = datetime.now(UTC).strftime("%Y%m%d-%H%M%S")
debug_root = "./test-chat/ai"
debug_dir = os.path.join(debug_root, f"multifile_output_{ts}")
os.makedirs(debug_dir, exist_ok=True)
# Save metadata
with open(os.path.join(debug_dir, "metadata.txt"), "w", encoding="utf-8") as f:
f.write(f"title: {title}\n")
f.write(f"format: {outputFormat}\n")
f.write(f"documents_count: {len(generated_documents)}\n")
f.write(f"split_strategy: {prompt_analysis.get('strategy', 'custom')}\n")
f.write(f"prompt_analysis: {prompt_analysis}\n")
# Save each generated document
for i, doc in enumerate(generated_documents):
doc_filename = doc["documentName"]
doc_data = doc["documentData"]
doc_mime = doc["mimeType"]
# Determine file extension
if outputFormat.lower() == "docx":
file_ext = ".docx"
elif outputFormat.lower() == "pdf":
file_ext = ".pdf"
elif outputFormat.lower() == "html":
file_ext = ".html"
else:
file_ext = f".{outputFormat}"
# Save the rendered document
output_path = os.path.join(debug_dir, f"document_{i+1}_{doc_filename}")
if file_ext in ['.md', '.txt', '.html', '.json', '.csv']:
# Text-based formats
with open(output_path, 'w', encoding='utf-8') as f:
f.write(doc_data)
else:
# Binary formats - decode from base64 if needed
try:
import base64
doc_bytes = base64.b64decode(doc_data)
with open(output_path, 'wb') as f:
f.write(doc_bytes)
except Exception:
# If not base64, save as text
with open(output_path, 'w', encoding='utf-8') as f:
f.write(doc_data)
logger.info(f"💾 Debug: Saved multi-file document {i+1}: {output_path}")
logger.info(f"💾 Debug: Multi-file output saved to: {debug_dir}")
except Exception as e:
logger.warning(f"Failed to save multi-file debug output: {e}")
return {
"success": True,
"content": ai_response,
"rendered_content": None, # Not applicable for multi-file
"mime_type": None, # Not applicable for multi-file
"filename": None, # Not applicable for multi-file
"format": outputFormat,
"title": title,
"documents": generated_documents,
"is_multi_file": True,
"split_strategy": prompt_analysis.get("strategy", "custom")
}
except Exception as e:
logger.error(f"Error in multi-file document generation: {str(e)}")
# Fallback to single-file
return await self._callAiWithSingleFileGeneration(
prompt, documents, options, outputFormat, title
)
async def _callAiJson(
self,
prompt: str,
documents: Optional[List[ChatDocument]],
options: AiCallOptions
) -> Dict[str, Any]:
"""
Handle AI calls with document processing for JSON output.
Returns structured JSON document instead of text.
"""
# Process documents with JSON merging
return await self.documentProcessor.processDocumentsPerChunkJson(documents, prompt, options)
async def _analyzePromptIntent(self, prompt: str, ai_service=None) -> Dict[str, Any]:
"""Use AI to analyze user prompt and determine processing requirements."""
if not ai_service:
return {"is_multi_file": False, "strategy": "single", "criteria": None}
try:
analysis_prompt = f"""
Analyze this user request and determine if it requires multiple file output or single file output.
User request: "{prompt}"
Respond with JSON only in this exact format:
{{
"is_multi_file": true/false,
"strategy": "single|per_entity|by_section|by_criteria|custom",
"criteria": "description of how to split content",
"file_naming_pattern": "suggested pattern for filenames",
"reasoning": "brief explanation of the analysis"
}}
Consider:
- Does the user want separate files for different entities (customers, products, etc.)?
- Does the user want to split content into multiple documents?
- What would be the most logical way to organize the content?
- What language is the request in? (analyze in the original language)
Return only the JSON response.
"""
from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, OperationType
request_options = AiCallOptions()
request_options.operationType = OperationType.GENERAL
request = AiCallRequest(prompt=analysis_prompt, context="", options=request_options)
response = await ai_service.aiObjects.call(request)
if response and response.content:
import json
import re
# Extract JSON from response
result = response.content.strip()
json_match = re.search(r'\{.*\}', result, re.DOTALL)
if json_match:
result = json_match.group(0)
analysis = json.loads(result)
return analysis
else:
return {"is_multi_file": False, "strategy": "single", "criteria": None}
except Exception as e:
logger.warning(f"AI prompt analysis failed: {str(e)}, defaulting to single file")
return {"is_multi_file": False, "strategy": "single", "criteria": None}
def _validateResponseStructure(self, response: Dict[str, Any], prompt_analysis: Dict[str, Any]) -> bool:
"""Validate that AI response matches the expected structure."""
try:
if not isinstance(response, dict):
logger.warning(f"Response validation failed: Response is not a dict, got {type(response)}")
return False
# Check for multi-file structure
if prompt_analysis.get("is_multi_file", False):
has_documents = "documents" in response
is_documents_list = isinstance(response.get("documents"), list)
logger.info(f"Multi-file validation: has_documents={has_documents}, is_documents_list={is_documents_list}")
if has_documents and is_documents_list:
logger.info(f"Multi-file validation passed: {len(response['documents'])} documents found")
else:
logger.warning(f"Multi-file validation failed: documents key present={has_documents}, documents is list={is_documents_list}")
logger.warning(f"Available keys: {list(response.keys())}")
return has_documents and is_documents_list
else:
has_sections = "sections" in response
is_sections_list = isinstance(response.get("sections"), list)
logger.info(f"Single-file validation: has_sections={has_sections}, is_sections_list={is_sections_list}")
return has_sections and is_sections_list
except Exception as e:
logger.warning(f"Response validation failed with exception: {str(e)}")
return False
async def _postRawDataChatMessage(self, payload: Any, label: str = "raw_extraction") -> None:
"""
Create a ChatMessage with the extracted raw JSON attached as a file so the user
has access to the data even if downstream processing fails.
"""
try:
services = self.services
workflow = getattr(services, 'currentWorkflow', None)
if not workflow:
return
# Serialize payload
import json as _json
from datetime import datetime, UTC
ts = datetime.now(UTC).strftime("%Y%m%d-%H%M%S")
content_text = _json.dumps(payload, ensure_ascii=False, indent=2)
content_bytes = content_text.encode('utf-8')
# Store as file via component storage
file_name = f"{label}_{ts}.json"
file_item = services.interfaceDbComponent.createFile(
name=file_name,
mimeType="application/json",
content=content_bytes
)
services.interfaceDbComponent.createFileData(file_item.id, content_bytes)
# Lookup file info for ChatDocument
file_info = services.workflow.getFileInfo(file_item.id)
doc = ChatDocument(
messageId="", # set after message creation
fileId=file_item.id,
fileName=file_info.get("fileName", file_name) if file_info else file_name,
fileSize=file_info.get("size", len(content_bytes)) if file_info else len(content_bytes),
mimeType=file_info.get("mimeType", "application/json") if file_info else "application/json"
)
# Create message referencing the file
messageData = {
"workflowId": workflow.id,
"role": "assistant",
"message": "Raw extraction data saved",
"status": "data",
"sequenceNr": len(getattr(workflow, 'messages', []) or []) + 1,
"publishedAt": services.utils.getUtcTimestamp(),
"documentsLabel": label,
"documents": []
}
message = services.workflow.createMessage(messageData)
if not message:
return
# Persist ChatDocument with messageId
doc.messageId = message.id
services.interfaceDbChat.createDocument(doc.to_dict())
# Update message to include document
try:
if not message.documents:
message.documents = []
message.documents.append(doc)
services.workflow.updateMessage(message.id, {"documents": [d.to_dict() for d in message.documents]})
except Exception:
pass
except Exception:
# Non-fatal; ignore if storage or chat creation fails
return