568 lines
No EOL
27 KiB
Python
568 lines
No EOL
27 KiB
Python
# Copyright (c) 2025 Patrick Motsch
|
|
# All rights reserved.
|
|
import logging
|
|
import uuid
|
|
import base64
|
|
import traceback
|
|
from typing import Any, Dict, List, Optional, Callable
|
|
from modules.datamodels.datamodelChat import ChatDocument
|
|
from modules.services.serviceGeneration.subDocumentUtility import (
|
|
getFileExtension,
|
|
getMimeTypeFromExtension,
|
|
detectMimeTypeFromContent,
|
|
detectMimeTypeFromData,
|
|
convertDocumentDataToString
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class GenerationService:
|
|
def __init__(self, serviceCenter=None):
|
|
# Directly use interfaces from the provided service center (no self.service calls)
|
|
self.services = serviceCenter
|
|
self.interfaceDbComponent = serviceCenter.interfaceDbComponent
|
|
self.interfaceDbChat = serviceCenter.interfaceDbChat
|
|
|
|
def processActionResultDocuments(self, actionResult, action) -> List[Dict[str, Any]]:
|
|
"""
|
|
Process documents produced by AI actions and convert them to ChatDocument format.
|
|
This function handles AI-generated document data, not document references.
|
|
Returns a list of processed document dictionaries.
|
|
"""
|
|
try:
|
|
# Read documents from the standard documents field (not data.documents)
|
|
documents = actionResult.documents if actionResult and hasattr(actionResult, 'documents') else []
|
|
|
|
if not documents:
|
|
return []
|
|
|
|
# Process each document from the AI action result
|
|
processedDocuments = []
|
|
for doc in documents:
|
|
processedDoc = self.processSingleDocument(doc, action)
|
|
if processedDoc:
|
|
processedDocuments.append(processedDoc)
|
|
|
|
return processedDocuments
|
|
except Exception as e:
|
|
logger.error(f"Error processing action result documents: {str(e)}")
|
|
return []
|
|
|
|
def processSingleDocument(self, doc: Any, action) -> Optional[Dict[str, Any]]:
|
|
"""Process a single document from action result with simplified logic"""
|
|
try:
|
|
# ActionDocument objects have documentName, documentData, and mimeType
|
|
mime_type = doc.mimeType
|
|
if mime_type == "application/octet-stream":
|
|
content = doc.documentData
|
|
# Detect MIME without relying on a service center
|
|
mime_type = detectMimeTypeFromContent(content, doc.documentName)
|
|
|
|
# WICHTIG: Für ActionDocuments mit validationMetadata (z.B. context.extractContent)
|
|
# müssen wir das gesamte ActionDocument serialisieren, nicht nur documentData
|
|
document_data = doc.documentData
|
|
if hasattr(doc, 'validationMetadata') and doc.validationMetadata:
|
|
# Wenn validationMetadata vorhanden ist, serialisiere das gesamte ActionDocument-Format
|
|
if mime_type == "application/json":
|
|
# Erstelle ActionDocument-Format mit validationMetadata und documentData
|
|
if hasattr(document_data, 'model_dump'):
|
|
# Pydantic v2
|
|
document_data_dict = document_data.model_dump()
|
|
elif hasattr(document_data, 'dict'):
|
|
# Pydantic v1
|
|
document_data_dict = document_data.dict()
|
|
elif isinstance(document_data, dict):
|
|
document_data_dict = document_data
|
|
else:
|
|
document_data_dict = {"data": str(document_data)}
|
|
|
|
# Erstelle ActionDocument-Format
|
|
document_data = {
|
|
"validationMetadata": doc.validationMetadata,
|
|
"documentData": document_data_dict
|
|
}
|
|
|
|
return {
|
|
'fileName': doc.documentName,
|
|
'fileSize': len(str(document_data)),
|
|
'mimeType': mime_type,
|
|
'content': document_data,
|
|
'document': doc
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Error processing single document: {str(e)}")
|
|
return None
|
|
|
|
def createDocumentsFromActionResult(self, actionResult, action, workflow, message_id=None) -> List[Any]:
|
|
"""
|
|
Create actual document objects from action result and store them in the system.
|
|
Returns a list of created document objects with proper workflow context.
|
|
"""
|
|
try:
|
|
processed_docs = self.processActionResultDocuments(actionResult, action)
|
|
|
|
createdDocuments = []
|
|
for i, doc_data in enumerate(processed_docs):
|
|
try:
|
|
documentName = doc_data['fileName']
|
|
documentData = doc_data['content']
|
|
mimeType = doc_data['mimeType']
|
|
|
|
# Handle binary data (images, PDFs, Office docs) differently from text
|
|
# Check if this is a binary MIME type
|
|
binaryMimeTypes = {
|
|
"application/vnd.openxmlformats-officedocument.wordprocessingml.document",
|
|
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
|
|
"application/vnd.openxmlformats-officedocument.presentationml.presentation",
|
|
"application/pdf",
|
|
"image/png", "image/jpeg", "image/jpg", "image/gif", "image/webp", "image/bmp", "image/svg+xml",
|
|
}
|
|
|
|
isBinaryMimeType = mimeType in binaryMimeTypes
|
|
base64encoded = False
|
|
content = None
|
|
|
|
if isBinaryMimeType:
|
|
# For binary data, handle bytes vs base64 string vs regular string
|
|
if isinstance(documentData, bytes):
|
|
# Already bytes - encode to base64 string for storage
|
|
# base64 is already imported at module level
|
|
content = base64.b64encode(documentData).decode('utf-8')
|
|
base64encoded = True
|
|
elif isinstance(documentData, str):
|
|
# Check if it's already valid base64
|
|
# base64 is already imported at module level
|
|
try:
|
|
# Try to decode to verify it's base64
|
|
base64.b64decode(documentData, validate=True)
|
|
# Valid base64 - use as is
|
|
content = documentData
|
|
base64encoded = True
|
|
except Exception:
|
|
# Not valid base64 - might be raw string, try encoding
|
|
try:
|
|
content = base64.b64encode(documentData.encode('utf-8')).decode('utf-8')
|
|
base64encoded = True
|
|
except Exception:
|
|
logger.warning(f"Could not process binary data for {documentName}, skipping")
|
|
continue
|
|
else:
|
|
# Other types - convert to string then base64
|
|
# base64 is already imported at module level
|
|
try:
|
|
content = base64.b64encode(str(documentData).encode('utf-8')).decode('utf-8')
|
|
base64encoded = True
|
|
except Exception:
|
|
logger.warning(f"Could not encode binary data for {documentName}, skipping")
|
|
continue
|
|
else:
|
|
# Text data - convert to string
|
|
content = convertDocumentDataToString(documentData, getFileExtension(documentName))
|
|
|
|
# Skip empty or minimal content
|
|
minimalContentPatterns = ['{}', '[]', 'null', '""', "''"]
|
|
if not content or content.strip() == "" or content.strip() in minimalContentPatterns:
|
|
logger.warning(f"Empty or minimal content for document {documentName}, skipping")
|
|
continue
|
|
|
|
# Normalize file extension based on mime type if missing or incorrect
|
|
try:
|
|
mime_to_ext = {
|
|
"application/vnd.openxmlformats-officedocument.wordprocessingml.document": ".docx",
|
|
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet": ".xlsx",
|
|
"application/vnd.openxmlformats-officedocument.presentationml.presentation": ".pptx",
|
|
"application/pdf": ".pdf",
|
|
"text/html": ".html",
|
|
"text/markdown": ".md",
|
|
"text/plain": ".txt",
|
|
"application/json": ".json",
|
|
"image/png": ".png",
|
|
"image/jpeg": ".jpg",
|
|
"image/jpg": ".jpg",
|
|
"image/gif": ".gif",
|
|
"image/webp": ".webp",
|
|
"image/bmp": ".bmp",
|
|
"image/svg+xml": ".svg",
|
|
}
|
|
expectedExt = mime_to_ext.get(mimeType)
|
|
if expectedExt:
|
|
if not documentName.lower().endswith(expectedExt):
|
|
# Append/replace extension to match mime type
|
|
if "." in documentName:
|
|
documentName = documentName.rsplit(".", 1)[0] + expectedExt
|
|
else:
|
|
documentName = documentName + expectedExt
|
|
except Exception:
|
|
pass
|
|
|
|
# Create document with file in one step using interfaces directly
|
|
document = self._createDocument(
|
|
fileName=documentName,
|
|
mimeType=mimeType,
|
|
content=content,
|
|
base64encoded=base64encoded,
|
|
messageId=message_id
|
|
)
|
|
if document:
|
|
# Set workflow context on the document if possible
|
|
self._setDocumentWorkflowContext(document, action, workflow)
|
|
createdDocuments.append(document)
|
|
else:
|
|
logger.error(f"Failed to create ChatDocument object for {documentName}")
|
|
except Exception as e:
|
|
logger.error(f"Error creating document {doc_data.get('fileName', 'unknown')}: {str(e)}")
|
|
continue
|
|
|
|
return createdDocuments
|
|
except Exception as e:
|
|
logger.error(f"Error creating documents from action result: {str(e)}")
|
|
return []
|
|
|
|
def _setDocumentWorkflowContext(self, document, action, workflow):
|
|
"""Set workflow context on a document for proper routing and labeling"""
|
|
try:
|
|
# Get current workflow context directly from workflow object
|
|
workflowContext = self._getWorkflowContext(workflow)
|
|
workflowStats = self._getWorkflowStats(workflow)
|
|
|
|
currentRound = workflowContext.get('currentRound', 0)
|
|
currentTask = workflowContext.get('currentTask', 0)
|
|
currentAction = workflowContext.get('currentAction', 0)
|
|
|
|
# Try to set workflow context attributes if they exist
|
|
if hasattr(document, 'roundNumber'):
|
|
document.roundNumber = currentRound
|
|
if hasattr(document, 'taskNumber'):
|
|
document.taskNumber = currentTask
|
|
if hasattr(document, 'actionNumber'):
|
|
document.actionNumber = currentAction
|
|
if hasattr(document, 'actionId'):
|
|
document.actionId = action.id if hasattr(action, 'id') else None
|
|
|
|
# Set additional workflow metadata if available
|
|
if hasattr(document, 'workflowId'):
|
|
document.workflowId = workflowStats.get('workflowId', workflow.id if hasattr(workflow, 'id') else None)
|
|
if hasattr(document, 'workflowStatus'):
|
|
document.workflowStatus = workflowStats.get('workflowStatus', workflow.status if hasattr(workflow, 'status') else 'unknown')
|
|
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Could not set workflow context on document: {str(e)}")
|
|
|
|
def _createDocument(self, fileName: str, mimeType: str, content: str, base64encoded: bool = True, messageId: str = None) -> Optional[ChatDocument]:
|
|
"""Create file and ChatDocument using interfaces without service indirection."""
|
|
try:
|
|
if not self.interfaceDbComponent:
|
|
logger.error("Component interface not available for document creation")
|
|
return None
|
|
# Convert content to bytes
|
|
if base64encoded:
|
|
# base64 is already imported at module level
|
|
content_bytes = base64.b64decode(content)
|
|
else:
|
|
content_bytes = content.encode('utf-8')
|
|
# Create file and store data
|
|
file_item = self.interfaceDbComponent.createFile(
|
|
name=fileName,
|
|
mimeType=mimeType,
|
|
content=content_bytes
|
|
)
|
|
self.interfaceDbComponent.createFileData(file_item.id, content_bytes)
|
|
# Collect file info
|
|
file_info = self._getFileInfo(file_item.id)
|
|
if not file_info:
|
|
logger.error(f"Could not get file info for fileId: {file_item.id}")
|
|
return None
|
|
# Build ChatDocument
|
|
document = ChatDocument(
|
|
id=str(uuid.uuid4()),
|
|
messageId=messageId or "",
|
|
fileId=file_item.id,
|
|
fileName=file_info.get("fileName", fileName),
|
|
fileSize=file_info.get("size", 0),
|
|
mimeType=file_info.get("mimeType", mimeType)
|
|
)
|
|
# Ensure document can access component interface later
|
|
if hasattr(document, 'setComponentInterface') and self.interfaceDbComponent:
|
|
try:
|
|
document.setComponentInterface(self.interfaceDbComponent)
|
|
except Exception:
|
|
pass
|
|
return document
|
|
except Exception as e:
|
|
logger.error(f"Error creating document: {str(e)}")
|
|
return None
|
|
|
|
def _getFileInfo(self, fileId: str) -> Optional[Dict[str, Any]]:
|
|
try:
|
|
if not self.interfaceDbComponent:
|
|
return None
|
|
file_item = self.interfaceDbComponent.getFile(fileId)
|
|
if file_item:
|
|
return {
|
|
"id": file_item.id,
|
|
"fileName": file_item.fileName,
|
|
"size": file_item.fileSize,
|
|
"mimeType": file_item.mimeType,
|
|
"fileHash": getattr(file_item, 'fileHash', None),
|
|
"creationDate": getattr(file_item, 'creationDate', None)
|
|
}
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"Error getting file info for {fileId}: {str(e)}")
|
|
return None
|
|
|
|
def _getWorkflowContext(self, workflow) -> Dict[str, int]:
|
|
try:
|
|
return {
|
|
'currentRound': getattr(workflow, 'currentRound', 0),
|
|
'currentTask': getattr(workflow, 'currentTask', 0),
|
|
'currentAction': getattr(workflow, 'currentAction', 0)
|
|
}
|
|
except Exception:
|
|
return {'currentRound': 0, 'currentTask': 0, 'currentAction': 0}
|
|
|
|
def _getWorkflowStats(self, workflow) -> Dict[str, Any]:
|
|
try:
|
|
context = self._getWorkflowContext(workflow)
|
|
return {
|
|
'currentRound': context['currentRound'],
|
|
'currentTask': context['currentTask'],
|
|
'currentAction': context['currentAction'],
|
|
'totalTasks': getattr(workflow, 'totalTasks', 0),
|
|
'totalActions': getattr(workflow, 'totalActions', 0),
|
|
'workflowStatus': getattr(workflow, 'status', 'unknown'),
|
|
'workflowId': getattr(workflow, 'id', 'unknown')
|
|
}
|
|
except Exception:
|
|
return {
|
|
'currentRound': 0,
|
|
'currentTask': 0,
|
|
'currentAction': 0,
|
|
'totalTasks': 0,
|
|
'totalActions': 0,
|
|
'workflowStatus': 'unknown',
|
|
'workflowId': 'unknown'
|
|
}
|
|
|
|
async def renderReport(self, extractedContent: Dict[str, Any], outputFormat: str, title: str, userPrompt: str = None, aiService=None, parentOperationId: Optional[str] = None) -> tuple[str, str, List[Dict[str, Any]]]:
|
|
"""
|
|
Render extracted JSON content to the specified output format.
|
|
Supports multiple documents in documents array (Phase 5: Multi-Dokument-Rendering).
|
|
Always uses unified "documents" array format.
|
|
Supports three content formats: reference, object (base64), extracted_text.
|
|
|
|
Args:
|
|
extractedContent: Structured JSON document from AI extraction
|
|
outputFormat: Target format (html, pdf, docx, txt, md, json, csv, xlsx)
|
|
title: Report title
|
|
userPrompt: User's original prompt for report generation
|
|
aiService: AI service instance for generation prompt creation
|
|
parentOperationId: Optional parent operation ID for hierarchical logging
|
|
|
|
Returns:
|
|
tuple: (rendered_content, mime_type, images_list)
|
|
images_list: List of image dicts with base64Data, altText, caption, etc.
|
|
"""
|
|
try:
|
|
# Validate JSON input
|
|
if not isinstance(extractedContent, dict):
|
|
raise ValueError("extractedContent must be a JSON dictionary")
|
|
|
|
# Unified approach: Always expect "documents" array (single doc = n=1)
|
|
if "documents" not in extractedContent:
|
|
raise ValueError("extractedContent must contain 'documents' array")
|
|
|
|
documents = extractedContent["documents"]
|
|
if len(documents) == 0:
|
|
raise ValueError("No documents found in 'documents' array")
|
|
|
|
# Phase 5: Multi-Dokument-Rendering
|
|
if len(documents) == 1:
|
|
# Single document - use existing logic
|
|
single_doc = documents[0]
|
|
if "sections" not in single_doc:
|
|
raise ValueError("Document must contain 'sections' field")
|
|
|
|
# Pass standardized schema to renderer (maintains architecture)
|
|
contentToRender = extractedContent # Pass full standardized schema
|
|
else:
|
|
# Multiple documents - merge all sections into one document for rendering
|
|
# Option: Merge all sections from all documents into a single document
|
|
all_sections = []
|
|
for doc in documents:
|
|
if isinstance(doc, dict) and "sections" in doc:
|
|
sections = doc.get("sections", [])
|
|
if isinstance(sections, list):
|
|
all_sections.extend(sections)
|
|
|
|
if not all_sections:
|
|
raise ValueError("No sections found in any document")
|
|
|
|
# Create merged document with all sections
|
|
merged_document = {
|
|
"metadata": extractedContent.get("metadata", {}),
|
|
"documents": [{
|
|
"id": "merged",
|
|
"title": title,
|
|
"filename": f"{title}.{outputFormat}",
|
|
"sections": all_sections
|
|
}]
|
|
}
|
|
contentToRender = merged_document
|
|
logger.info(f"Rendering {len(documents)} documents with {len(all_sections)} total sections")
|
|
|
|
# Get the appropriate renderer for the format
|
|
renderer = self._getFormatRenderer(outputFormat)
|
|
if not renderer:
|
|
raise ValueError(f"Unsupported output format: {outputFormat}")
|
|
|
|
# Render the JSON content directly (AI generation handled by main service)
|
|
# Renderer receives standardized schema and extracts what it needs
|
|
renderedContent, mimeType = await renderer.render(contentToRender, title, userPrompt, aiService)
|
|
|
|
# Get images from renderer if available
|
|
images = []
|
|
if hasattr(renderer, 'getRenderedImages'):
|
|
images = renderer.getRenderedImages()
|
|
|
|
return renderedContent, mimeType, images
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error rendering JSON report to {outputFormat}: {str(e)}")
|
|
raise
|
|
|
|
async def generateDocumentWithTwoPhases(
|
|
self,
|
|
userPrompt: str,
|
|
cachedContent: Optional[Dict[str, Any]] = None,
|
|
contentParts: Optional[List[Any]] = None,
|
|
maxSectionLength: int = 500,
|
|
parallelGeneration: bool = True,
|
|
progressCallback: Optional[Callable] = None
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Generate document using two-phase approach:
|
|
1. Generate structure skeleton with empty sections
|
|
2. Generate content for each section iteratively
|
|
|
|
This is the core logic for document generation in AI calls.
|
|
|
|
Args:
|
|
userPrompt: User's original prompt
|
|
cachedContent: Optional extracted content cache (from extraction phase)
|
|
contentParts: Optional list of ContentParts to use for structure generation
|
|
maxSectionLength: Maximum words for simple sections
|
|
parallelGeneration: Enable parallel section generation
|
|
progressCallback: Optional callback function(progress, total, message) for progress updates
|
|
|
|
Returns:
|
|
Complete document structure with populated elements ready for rendering
|
|
"""
|
|
try:
|
|
from modules.services.serviceGeneration.subStructureGenerator import StructureGenerator
|
|
from modules.services.serviceGeneration.subContentGenerator import ContentGenerator
|
|
|
|
# Phase 1: Generate structure skeleton
|
|
if progressCallback:
|
|
progressCallback(0, 100, "Generating document structure...")
|
|
|
|
structureGenerator = StructureGenerator(self.services)
|
|
|
|
# Extract imageDocuments from cachedContent if available
|
|
existingImages = None
|
|
if cachedContent and cachedContent.get("imageDocuments"):
|
|
existingImages = cachedContent.get("imageDocuments")
|
|
|
|
structure = await structureGenerator.generateStructure(
|
|
userPrompt=userPrompt,
|
|
documentList=None, # Not used in current implementation
|
|
cachedContent=cachedContent,
|
|
contentParts=contentParts, # Pass ContentParts for structure generation
|
|
maxSectionLength=maxSectionLength,
|
|
existingImages=existingImages
|
|
)
|
|
|
|
if progressCallback:
|
|
progressCallback(30, 100, "Structure generated, starting content generation...")
|
|
|
|
# Phase 2: Generate content for each section
|
|
contentGenerator = ContentGenerator(self.services)
|
|
|
|
# Create progress callback wrapper for content generation phase (30-90%)
|
|
def contentProgressCallback(sectionIndex: int, totalSections: int, message: str):
|
|
if progressCallback:
|
|
# Map section progress to overall progress (30% to 90%)
|
|
if totalSections > 0:
|
|
overallProgress = 30 + int(60 * (sectionIndex / totalSections))
|
|
else:
|
|
overallProgress = 30
|
|
progressCallback(overallProgress, 100, f"Section {sectionIndex}/{totalSections}: {message}")
|
|
|
|
completeStructure = await contentGenerator.generateContent(
|
|
structure=structure,
|
|
cachedContent=cachedContent,
|
|
userPrompt=userPrompt,
|
|
contentParts=contentParts, # Pass ContentParts for content generation
|
|
progressCallback=contentProgressCallback,
|
|
parallelGeneration=parallelGeneration
|
|
)
|
|
|
|
if progressCallback:
|
|
progressCallback(100, 100, "Document generation complete")
|
|
|
|
return completeStructure
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in two-phase document generation: {str(e)}")
|
|
logger.debug(traceback.format_exc())
|
|
raise
|
|
|
|
async def getAdaptiveExtractionPrompt(
|
|
self,
|
|
outputFormat: str,
|
|
userPrompt: str,
|
|
title: str,
|
|
aiService=None
|
|
) -> str:
|
|
"""Get adaptive extraction prompt."""
|
|
from modules.services.serviceExtraction.subPromptBuilderExtraction import buildExtractionPrompt
|
|
return await buildExtractionPrompt(
|
|
outputFormat=outputFormat,
|
|
userPrompt=userPrompt,
|
|
title=title,
|
|
aiService=aiService,
|
|
services=self.services
|
|
)
|
|
|
|
|
|
def _getFormatRenderer(self, output_format: str):
|
|
"""Get the appropriate renderer for the specified format using auto-discovery."""
|
|
try:
|
|
from .renderers.registry import getRenderer, getSupportedFormats
|
|
renderer = getRenderer(output_format, services=self.services)
|
|
|
|
if renderer:
|
|
return renderer
|
|
|
|
# Log available formats for debugging
|
|
availableFormats = getSupportedFormats()
|
|
logger.error(
|
|
f"No renderer found for format '{output_format}'. "
|
|
f"Available formats: {availableFormats}"
|
|
)
|
|
|
|
# Fallback to text renderer if no specific renderer found
|
|
logger.warning(f"Falling back to text renderer for format {output_format}")
|
|
fallbackRenderer = getRenderer('text', services=self.services)
|
|
if fallbackRenderer:
|
|
return fallbackRenderer
|
|
|
|
logger.error("Even text renderer fallback failed")
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting renderer for {output_format}: {str(e)}")
|
|
# traceback is already imported at module level
|
|
logger.debug(traceback.format_exc())
|
|
return None |