diff --git a/modules/datamodels/datamodelExtraction.py b/modules/datamodels/datamodelExtraction.py index 78ce657e..61d12977 100644 --- a/modules/datamodels/datamodelExtraction.py +++ b/modules/datamodels/datamodelExtraction.py @@ -1,21 +1,19 @@ from typing import Any, Dict, List, Optional -from dataclasses import dataclass, field +from pydantic import BaseModel, Field -@dataclass -class ContentPart: - id: str - parentId: Optional[str] - label: str - typeGroup: str - mimeType: str - data: str - metadata: Dict[str, Any] = field(default_factory=dict) +class ContentPart(BaseModel): + id: str = Field(description="Unique content part identifier") + parentId: Optional[str] = Field(default=None, description="Optional parent content part id") + label: str = Field(description="Human readable label of the part") + typeGroup: str = Field(description="Logical type group: text, table, structure, binary, ...") + mimeType: str = Field(description="MIME type of the part payload") + data: str = Field(default="", description="Primary data payload, often extracted text") + metadata: Dict[str, Any] = Field(default_factory=dict, description="Arbitrary metadata for the part") -@dataclass -class ExtractedContent: - id: str - parts: List[ContentPart] - summary: Optional[Dict[str, Any]] = None +class ExtractedContent(BaseModel): + id: str = Field(description="Extraction id or source document id") + parts: List[ContentPart] = Field(default_factory=list, description="List of extracted parts") + summary: Optional[Dict[str, Any]] = Field(default=None, description="Optional extraction summary") diff --git a/modules/services/__init__.py b/modules/services/__init__.py index a8882a2c..8f7843e5 100644 --- a/modules/services/__init__.py +++ b/modules/services/__init__.py @@ -55,11 +55,11 @@ class Services: # Initialize service packages - from .serviceDocument.mainServiceDocumentExtraction import DocumentExtractionService - self.documentExtraction = PublicService(DocumentExtractionService(self)) + from .serviceExtraction.mainServiceExtraction import ExtractionService + self.extraction = PublicService(ExtractionService(self)) - from .serviceDocument.mainServiceDocumentGeneration import DocumentGenerationService - self.documentGeneration = PublicService(DocumentGenerationService(self)) + from .serviceGeneration.mainServiceGeneration import GenerationService + self.generation = PublicService(GenerationService(self)) from .serviceNeutralization.mainServiceNeutralization import NeutralizationService self.neutralization = PublicService(NeutralizationService(self)) @@ -76,14 +76,9 @@ class Services: from .serviceWorkflow.mainServiceWorkflow import WorkflowService self.workflow = PublicService(WorkflowService(self)) - from .serviceWeb.mainServiceWeb import WebService - self.web = PublicService(WebService(self)) - from .serviceUtils.mainServiceUtils import UtilsService self.utils = PublicService(UtilsService(self)) - async def extractContentFromDocument(self, prompt, document): - return await self.services.documentExtraction.extractContentFromDocument(prompt, document) def getInterface(user: User, workflow: ChatWorkflow) -> Services: return Services(user, workflow) diff --git a/modules/services/serviceAi/mainServiceAi.py b/modules/services/serviceAi/mainServiceAi.py index be0b0bad..9d1e4735 100644 --- a/modules/services/serviceAi/mainServiceAi.py +++ b/modules/services/serviceAi/mainServiceAi.py @@ -196,7 +196,7 @@ class AiService: processedContents: List[str] = [] try: - extractionResult = self.extractionService.extractDocuments(documentList, extractionOptions) + extractionResult = self.extractionService.extractContent(documentList, extractionOptions) def _partsToText(parts) -> str: lines: List[str] = [] @@ -205,7 +205,7 @@ class AiService: lines.append(p.data) return "\n\n".join(lines) - if processIndividually and isinstance(extractionResult, list): + if isinstance(extractionResult, list): for i, ec in enumerate(extractionResult): try: contentText = _partsToText(ec.parts) @@ -216,9 +216,8 @@ class AiService: logger.warning(f"Error aggregating extracted content: {str(e)}") processedContents.append("[Error aggregating content]") else: - # pooled mode returns dict - parts = extractionResult.get("parts", []) if isinstance(extractionResult, dict) else [] - contentText = _partsToText(parts) + # Fallback: no content + contentText = "" if compressDocuments and len(contentText.encode("utf-8")) > 10000: contentText = await self._compressContent(contentText, 10000, "document") processedContents.append(contentText) @@ -359,7 +358,7 @@ class AiService: "mimeType": d.mimeType } for d in documents] - extracted_content = await self.extractionService.extractDocuments( + extracted_content = await self.extractionService.extractContent( documentList=documentList, options={ "prompt": prompt, @@ -371,8 +370,15 @@ class AiService: } ) - # Get text content from extracted parts using typeGroup-aware processing - context = self._extractTextFromContentParts(extracted_content) + # Build context from list of ExtractedContent + if isinstance(extracted_content, list): + context = "\n\n---\n\n".join([ + "\n\n".join([ + p.data for p in ec.parts if p.typeGroup in ["text", "table", "structure"] and p.data + ]) for ec in extracted_content + ]) + else: + context = "" # Check size and reduce if needed full_prompt = prompt + "\n\n" + context if context else prompt diff --git a/modules/services/serviceDocument/mainServiceDocumentExtraction.py b/modules/services/serviceDocument/mainServiceDocumentExtraction.py deleted file mode 100644 index 1a658fb8..00000000 --- a/modules/services/serviceDocument/mainServiceDocumentExtraction.py +++ /dev/null @@ -1,2054 +0,0 @@ -from typing import Dict, Any, List, Optional, Union, Tuple, TypedDict, Callable, Awaitable -import logging -import json -import os -import io -import base64 -from datetime import datetime, UTC -from pathlib import Path -import xml.etree.ElementTree as ET -from bs4 import BeautifulSoup -import uuid -from modules.services.serviceDocument.subDocumentUtility import ( - getFileExtension, - getMimeTypeFromExtension, - detectMimeTypeFromContent, - detectMimeTypeFromData, - convertDocumentDataToString -) - -from modules.datamodels.datamodelWorkflow import ExtractedContent -from modules.datamodels.datamodelChat import ContentItem, ContentMetadata, ChatDocument -from modules.services.serviceNeutralization.mainServiceNeutralization import NeutralizationService -from modules.shared.configuration import APP_CONFIG -from modules.services.serviceAi.mainServiceAi import AiService -from modules.interfaces.interfaceAiObjects import AiObjects - -logger = logging.getLogger(__name__) - -# Optional imports - only loaded when needed -pdfExtractorLoaded = False -officeExtractorLoaded = False -imageProcessorLoaded = False - -class FileProcessingError(Exception): - """Custom exception for file processing errors.""" - pass - -class DocumentExtractionService: - """Processor for handling document operations and content extraction.""" - - def __init__(self, serviceCenter=None): - """Initialize the document processor.""" - self._neutralizer = NeutralizationService() if APP_CONFIG.get("ENABLE_CONTENT_NEUTRALIZATION", False) else None - self._serviceCenter = serviceCenter - # Store service center for access to user/workflow context when needed - self.services = None # Will be set to None to avoid circular dependency - - self.supportedTypes: Dict[str, Callable[[bytes, str, str], Awaitable[List[ContentItem]]]] = { - # Text and data files - 'text/plain': self._processText, - 'text/csv': self._processCsv, - 'application/json': self._processJson, - 'application/xml': self._processXml, - 'text/html': self._processHtml, - 'image/svg+xml': self._processSvg, - - # Programming languages - 'application/javascript': self._processText, - 'application/typescript': self._processText, - 'text/jsx': self._processText, - 'text/tsx': self._processText, - 'text/x-python': self._processText, - 'text/x-java-source': self._processText, - 'text/x-c': self._processText, - 'text/x-c++src': self._processText, - 'text/x-c++hdr': self._processText, - 'text/x-csharp': self._processText, - 'application/x-httpd-php': self._processText, - 'text/x-ruby': self._processText, - 'text/x-go': self._processText, - 'text/x-rust': self._processText, - 'text/x-swift': self._processText, - 'text/x-kotlin': self._processText, - 'text/x-scala': self._processText, - 'text/x-r': self._processText, - 'text/x-matlab': self._processText, - 'text/x-perl': self._processText, - 'application/x-sh': self._processText, - 'application/x-powershell': self._processText, - 'application/x-msdos-program': self._processText, - 'text/vbscript': self._processText, - 'text/x-lua': self._processText, - 'application/sql': self._processText, - 'application/dart': self._processText, - 'text/x-elm': self._processText, - 'text/x-clojure': self._processText, - 'text/x-haskell': self._processText, - 'text/x-fsharp': self._processText, - 'text/x-ocaml': self._processText, - - # Web technologies - 'text/css': self._processText, - 'text/x-scss': self._processText, - 'text/x-sass': self._processText, - 'text/x-less': self._processText, - 'text/x-vue': self._processText, - 'text/x-svelte': self._processText, - 'text/x-astro': self._processText, - - # Configuration and build files - 'application/x-yaml': self._processText, - 'application/toml': self._processText, - 'text/x-dockerfile': self._processText, - 'text/x-makefile': self._processText, - 'text/x-cmake': self._processText, - 'text/x-gradle': self._processText, - 'text/x-maven': self._processText, - - # Documentation and markup - 'text/markdown': self._processText, - 'text/x-rst': self._processText, - 'application/x-tex': self._processText, - 'text/x-bibtex': self._processText, - 'text/asciidoc': self._processText, - 'text/x-wiki': self._processText, - - # Images - 'image/jpeg': self._processImage, - 'image/png': self._processImage, - 'image/gif': self._processImage, - 'image/webp': self._processImage, - 'image/bmp': self._processImage, - 'image/tiff': self._processImage, - 'image/x-icon': self._processImage, - - # Documents - 'application/pdf': self._processPdf, - 'application/vnd.openxmlformats-officedocument.wordprocessingml.document': self._processDocx, - 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet': self._processXlsx, - 'application/vnd.openxmlformats-officedocument.presentationml.presentation': self._processPptx, - 'application/vnd.oasis.opendocument.text': self._processText, - 'application/vnd.oasis.opendocument.spreadsheet': self._processText, - 'application/vnd.oasis.opendocument.presentation': self._processText, - - # Legacy Office formats - 'application/msword': self._processLegacyDoc, - 'application/vnd.ms-excel': self._processLegacyXls, - 'application/vnd.ms-powerpoint': self._processLegacyPpt - } - - self.chunkSizes = { - "text": 40000, # General text content - "plain": 40000, # Plain text - "csv": 40000, # CSV data - "json": 40000, # JSON data - "xml": 40000, # XML data - "html": 40000, # HTML content - "markdown": 40000, # Markdown content - "code": 80000, # Programming code (increased for better preservation) - "script": 80000, # Script files (increased for better preservation) - "javascript": 80000, # JavaScript files specifically - "typescript": 80000, # TypeScript files specifically - "config": 40000, # Configuration files - "image": 1024 * 1024, # 1MB for images - "video": 5 * 1024 * 1024, # 5MB for video chunks - "binary": 1024 * 1024, # 1MB for binary data - "pdf": 40000, # PDF text content - "docx": 40000, # Word document text - "xlsx": 40000, # Excel data - "svg": 40000 # SVG content - } - - def _robustTextDecode(self, fileData: bytes, fileName: str = "unknown") -> str: - """ - Robustly decode text data with multiple encoding fallbacks. - - Args: - fileData: Raw bytes to decode - fileName: fileName for logging purposes - - Returns: - Decoded text string - - Raises: - FileProcessingError: If all decoding attempts fail - """ - # Try multiple encoding options in order of likelihood - encodings_to_try = ['utf-8', 'windows-1252', 'iso-8859-1', 'latin-1', 'cp1252'] - content = None - - # First try UTF-8 (most common) - try: - content = fileData.decode('utf-8') - - return content - except UnicodeDecodeError: - pass - - # Try other encodings - for encoding in encodings_to_try[1:]: - try: - content = fileData.decode(encoding) - - return content - except UnicodeDecodeError: - continue - - # If all encodings fail, try with error handling - try: - # Try with chardet for automatic detection - import chardet - detected = chardet.detect(fileData) - if detected['confidence'] > 0.7: - detected_encoding = detected['encoding'] - content = fileData.decode(detected_encoding, errors='replace') - - return content - else: - # Last resort: decode with replacement characters - content = fileData.decode('utf-8', errors='replace') - logger.warning(f"{fileName}: decoded with UTF-8 and replacement characters due to low encoding confidence") - return content - except ImportError: - # chardet not available, use replacement characters - content = fileData.decode('utf-8', errors='replace') - logger.warning(f"{fileName}: decoded with UTF-8 and replacement characters (chardet not available)") - return content - - # This should never be reached, but just in case - raise FileProcessingError(f"Failed to decode {fileName} with any encoding") - - def _loadPdfExtractor(self): - """Loads PDF extraction libraries when needed""" - global pdfExtractorLoaded - if not pdfExtractorLoaded: - try: - global PyPDF2, fitz - import PyPDF2 - import fitz # PyMuPDF for more extensive PDF processing - pdfExtractorLoaded = True - logger.debug("PDF extraction libraries successfully loaded") - except ImportError as e: - logger.warning(f"PDF extraction libraries could not be loaded: {e}") - - def _loadOfficeExtractor(self): - """Loads Office document extraction libraries when needed""" - global officeExtractorLoaded - if not officeExtractorLoaded: - try: - global docx, openpyxl - import docx # python-docx for Word documents - import openpyxl # for Excel files - officeExtractorLoaded = True - logger.debug("Office extraction libraries successfully loaded") - except ImportError as e: - logger.warning(f"Office extraction libraries could not be loaded: {e}") - - def _loadImageProcessor(self): - """Loads image processing libraries when needed""" - global imageProcessorLoaded - if not imageProcessorLoaded: - try: - global PIL, Image - from PIL import Image - imageProcessorLoaded = True - logger.debug("Image processing libraries successfully loaded") - except ImportError as e: - logger.warning(f"Image processing libraries could not be loaded: {e}") - - - - async def processFileData(self, fileData: bytes, fileName: str, mimeType: str, base64Encoded: bool = False, prompt: str = None, documentId: str = None, enableAI: bool = True) -> ExtractedContent: - """ - Process file data directly and extract its contents with optional AI processing. - - Args: - fileData: Raw file data as bytes - fileName: Name of the file - mimeType: MIME type of the file - base64Encoded: Whether the data is base64 encoded - prompt: Prompt for AI content extraction - documentId: Optional document ID - enableAI: Whether to enable AI processing (default: True) - - Returns: - ExtractedContent containing the processed content - - Raises: - FileProcessingError: If document processing fails - """ - try: - # Decode base64 if needed - if base64Encoded: - fileData = base64.b64decode(fileData) - # Use subDocumentUtility for mime type detection - if mimeType == "application/octet-stream": - mimeType = detectMimeTypeFromData(fileData, fileName, self._serviceCenter) - # Process document based on type - if mimeType not in self.supportedTypes: - contentItems = await self._processBinary(fileData, fileName, mimeType) - else: - processor = self.supportedTypes[mimeType] - contentItems = await processor(fileData, fileName, mimeType) - - # Process with AI if prompt provided and AI is enabled - if enableAI and prompt and contentItems: - try: - # Process each content item with AI - processedItems = await self._aiDataExtraction(contentItems, prompt) - contentItems = processedItems - except Exception as e: - logger.error(f"Error processing content with AI: {str(e)}") - elif not enableAI: - logger.debug(f"AI processing disabled for {fileName}, returning raw extracted content") - - return ExtractedContent( - id=documentId if documentId else str(uuid.uuid4()), - contents=contentItems - ) - - except Exception as e: - logger.error(f"Error processing file data: {str(e)}") - raise FileProcessingError(f"Failed to process file data: {str(e)}") - - - - async def _processText(self, fileData: bytes, fileName: str, mimeType: str) -> List[ContentItem]: - """Process text document with robust encoding detection and complete content extraction""" - try: - content = self._robustTextDecode(fileData, fileName) - - # Validate that we got the complete content - if not content or len(content.strip()) == 0: - logger.warning(f"Empty content extracted from {fileName}") - return [ContentItem( - label="empty", - data="[Empty file or no readable content]", - metadata=ContentMetadata( - size=0, - pages=1, - mimeType="text/plain", - base64Encoded=False - ) - )] - - # Log content size for debugging - content_size = len(content.encode('utf-8')) - - - # Use subDocumentUtility for mime type - mime_type = getMimeTypeFromExtension(getFileExtension(fileName)) - return [ContentItem( - label="main", - data=content, - metadata=ContentMetadata( - size=content_size, - pages=1, - mimeType=mime_type, - base64Encoded=False - ) - )] - except Exception as e: - logger.error(f"Error processing text document: {str(e)}") - raise FileProcessingError(f"Failed to process text document: {str(e)}") - - async def _processCsv(self, fileData: bytes, fileName: str, mimeType: str) -> List[ContentItem]: - """Process CSV document with robust encoding detection""" - try: - content = self._robustTextDecode(fileData, fileName) - mime_type = getMimeTypeFromExtension(getFileExtension(fileName)) - return [ContentItem( - label="main", - data=content, - metadata=ContentMetadata( - size=len(content.encode('utf-8')), - pages=1, - mimeType=mime_type, - base64Encoded=False - ) - )] - except Exception as e: - logger.error(f"Error processing CSV document: {str(e)}") - raise FileProcessingError(f"Failed to process CSV document: {str(e)}") - - async def _processJson(self, fileData: bytes, fileName: str, mimeType: str) -> List[ContentItem]: - """Process JSON document with robust encoding detection""" - try: - content = self._robustTextDecode(fileData, fileName) - jsonData = json.loads(content) - mime_type = getMimeTypeFromExtension(getFileExtension(fileName)) - return [ContentItem( - label="main", - data=content, - metadata=ContentMetadata( - size=len(content.encode('utf-8')), - pages=1, - mimeType=mime_type, - base64Encoded=False - ) - )] - except Exception as e: - logger.error(f"Error processing JSON document: {str(e)}") - raise FileProcessingError(f"Failed to process JSON document: {str(e)}") - - async def _processXml(self, fileData: bytes, fileName: str, mimeType: str) -> List[ContentItem]: - """Process XML document with robust encoding detection""" - try: - content = self._robustTextDecode(fileData, fileName) - mime_type = getMimeTypeFromExtension(getFileExtension(fileName)) - return [ContentItem( - label="main", - data=content, - metadata=ContentMetadata( - size=len(content.encode('utf-8')), - pages=1, - mimeType=mime_type, - base64Encoded=False - ) - )] - except Exception as e: - logger.error(f"Error processing XML document: {str(e)}") - raise FileProcessingError(f"Failed to process XML document: {str(e)}") - - async def _processHtml(self, fileData: bytes, fileName: str, mimeType: str) -> List[ContentItem]: - """Process HTML document with robust encoding detection""" - try: - content = self._robustTextDecode(fileData, fileName) - mime_type = getMimeTypeFromExtension(getFileExtension(fileName)) - return [ContentItem( - label="main", - data=content, - metadata=ContentMetadata( - size=len(content.encode('utf-8')), - pages=1, - mimeType=mime_type, - base64Encoded=False - ) - )] - except Exception as e: - logger.error(f"Error processing HTML document: {str(e)}") - raise FileProcessingError(f"Failed to process HTML document: {str(e)}") - - async def _processSvg(self, fileData: bytes, fileName: str, mimeType: str) -> List[ContentItem]: - """Process SVG document with robust encoding detection and meaningful content extraction""" - try: - content = self._robustTextDecode(fileData, fileName) - - # Check if it's actually SVG content - if " List[ContentItem]: - """Process image document""" - try: - self._loadImageProcessor() - if not imageProcessorLoaded: - raise FileProcessingError("Image processing libraries not available") - - with io.BytesIO(fileData) as imgStream: - img = Image.open(imgStream) - - # For GIF files, provide descriptive information instead of AI processing - if mimeType == "image/gif": - try: - frame_count = getattr(img, 'n_frames', 1) - duration = getattr(img, 'duration', 0) - - # Create a descriptive text about the GIF - gif_description = f"GIF Image Analysis:\n" - gif_description += f"- Dimensions: {img.width} x {img.height} pixels\n" - gif_description += f"- Frame count: {frame_count}\n" - gif_description += f"- Color mode: {img.mode}\n" - if duration > 0: - gif_description += f"- Duration: {duration}ms\n" - gif_description += f"- File size: {len(fileData)} bytes\n" - gif_description += f"- Format: {img.format}\n\n" - gif_description += f"Note: This is an animated GIF image. The AI cannot directly analyze image content, but the file contains {frame_count} frame(s) of animation." - - return [ContentItem( - label="gif_analysis", - data=gif_description, - metadata=ContentMetadata( - size=len(gif_description.encode('utf-8')), - width=img.width, - height=img.height, - colorMode=img.mode, - mimeType="text/plain", - base64Encoded=False - ) - )] - except Exception as gifError: - logger.warning(f"GIF processing failed: {str(gifError)}") - # Fallback to basic description - pass - - metadata = ContentMetadata( - size=len(fileData), - width=img.width, - height=img.height, - colorMode=img.mode, - mimeType=mimeType, - base64Encoded=True - ) - - # Convert image to base64 for storage - imgStream.seek(0) - imgData = base64.b64encode(imgStream.read()).decode('utf-8') - - return [ContentItem( - label="image", - data=imgData, - metadata=metadata - )] - except Exception as e: - logger.error(f"Error processing image document: {str(e)}") - raise FileProcessingError(f"Failed to process image document: {str(e)}") - - async def _processPdf(self, fileData: bytes, fileName: str, mimeType: str) -> List[ContentItem]: - """Process PDF document""" - try: - self._loadPdfExtractor() - if not pdfExtractorLoaded: - raise FileProcessingError("PDF extraction libraries not available") - - contentItems = [] - - with io.BytesIO(fileData) as pdfStream: - # Extract text with PyPDF2 - pdfReader = PyPDF2.PdfReader(pdfStream) - metadata = ContentMetadata( - size=len(fileData), - pages=len(pdfReader.pages), - mimeType="application/pdf", - base64Encoded=False - ) - - # Extract text from all pages - for pageNum in range(len(pdfReader.pages)): - page = pdfReader.pages[pageNum] - pageText = page.extract_text() - if pageText: - contentItems.append(ContentItem( - label=f"page_{pageNum + 1}", - data=pageText, - metadata=ContentMetadata( - size=len(pageText.encode('utf-8')), - pages=1, - mimeType="text/plain", - base64Encoded=False - ) - )) - - # Extract images with PyMuPDF - pdfStream.seek(0) - doc = fitz.open(stream=pdfStream, filetype="pdf") - for pageNum in range(len(doc)): - page = doc[pageNum] - for imgIndex, imgInfo in enumerate(page.get_images(full=True)): - try: - xref = imgInfo[0] - baseImage = doc.extract_image(xref) - if baseImage: - imageBytes = baseImage.get("image", b"") - imageExt = baseImage.get("ext", "png") - - if imageBytes: - contentItems.append(ContentItem( - label=f"image_{pageNum + 1}_{imgIndex}", - data=base64.b64encode(imageBytes).decode('utf-8'), - metadata=ContentMetadata( - size=len(imageBytes), - pages=1, - mimeType=f"image/{imageExt}", - base64Encoded=True - ) - )) - except Exception as imgE: - logger.warning(f"Error extracting image {imgIndex} on page {pageNum + 1}: {str(imgE)}") - - doc.close() - - return contentItems - except Exception as e: - logger.error(f"Error processing PDF document: {str(e)}") - raise FileProcessingError(f"Failed to process PDF document: {str(e)}") - - async def _processDocx(self, fileData: bytes, fileName: str, mimeType: str) -> List[ContentItem]: - """Process Word document with enhanced formatting preservation""" - try: - self._loadOfficeExtractor() - if not officeExtractorLoaded: - raise FileProcessingError("Office extraction libraries not available") - - contentItems = [] - - with io.BytesIO(fileData) as docxStream: - doc = docx.Document(docxStream) - - # Extract document properties - doc_properties = [] - if doc.core_properties.title: - doc_properties.append(f"Title: {doc.core_properties.title}") - if doc.core_properties.author: - doc_properties.append(f"Author: {doc.core_properties.author}") - if doc.core_properties.subject: - doc_properties.append(f"Subject: {doc.core_properties.subject}") - if doc.core_properties.keywords: - doc_properties.append(f"Keywords: {doc.core_properties.keywords}") - if doc.core_properties.comments: - doc_properties.append(f"Comments: {doc.core_properties.comments}") - - # Extract main content with formatting - main_content = [] - - # Process paragraphs with formatting - for para in doc.paragraphs: - if para.text.strip(): - # Get paragraph style - style_name = para.style.name if para.style else "Normal" - - # Check for heading styles - if style_name.startswith('Heading'): - level = style_name.replace('Heading ', '') - main_content.append(f"\n{'#' * int(level)} {para.text}") - else: - # Check for bold, italic, underline formatting - formatted_text = para.text - if para.runs: - # Process individual runs for formatting - run_texts = [] - for run in para.runs: - run_text = run.text - if run.bold: - run_text = f"**{run_text}**" - if run.italic: - run_text = f"*{run_text}*" - if run.underline: - run_text = f"__{run_text}__" - run_texts.append(run_text) - formatted_text = ''.join(run_texts) - - main_content.append(formatted_text) - - # Extract tables with better formatting - table_count = 0 - for table in doc.tables: - table_count += 1 - main_content.append(f"\n\n--- Table {table_count} ---") - - # Get table headers (first row) - if table.rows: - header_row = table.rows[0] - headers = [cell.text.strip() for cell in header_row.cells] - main_content.append("| " + " | ".join(headers) + " |") - main_content.append("|" + "|".join(["---"] * len(headers)) + "|") - - # Process data rows - for row in table.rows[1:]: - row_data = [cell.text.strip() for cell in row.cells] - main_content.append("| " + " | ".join(row_data) + " |") - - main_content.append("--- End Table ---\n") - - # Extract headers and footers if available - try: - # Check for headers and footers in sections - for section in doc.sections: - # Header - if section.header: - header_text = [] - for para in section.header.paragraphs: - if para.text.strip(): - header_text.append(f"[Header] {para.text}") - if header_text: - main_content.insert(0, "\n".join(header_text) + "\n") - - # Footer - if section.footer: - footer_text = [] - for para in section.footer.paragraphs: - if para.text.strip(): - footer_text.append(f"[Footer] {para.text}") - if footer_text: - main_content.append("\n" + "\n".join(footer_text)) - except Exception as header_footer_error: - logger.debug(f"Could not extract headers/footers: {header_footer_error}") - - # Extract comments if available - try: - comments = [] - for comment in doc.part.comments_part.comments if doc.part.comments_part else []: - comment_text = comment.text.strip() - if comment_text: - comments.append(f"[Comment] {comment_text}") - - if comments: - main_content.append("\n\n--- Comments ---") - main_content.extend(comments) - main_content.append("--- End Comments ---") - except Exception as comment_error: - logger.debug(f"Could not extract comments: {comment_error}") - - # Combine all content - if doc_properties: - main_content.insert(0, "--- Document Properties ---\n" + "\n".join(doc_properties) + "\n--- End Properties ---\n") - - final_content = "\n".join(main_content) - - # Create main content item - contentItems.append(ContentItem( - label="main", - data=final_content, - metadata=ContentMetadata( - size=len(final_content.encode('utf-8')), - pages=len(doc.paragraphs), - mimeType="text/markdown", # Use markdown for better formatting - base64Encoded=False - ) - )) - - # Create separate content item for tables only (if tables exist) - if table_count > 0: - table_content = [] - for i, table in enumerate(doc.tables): - table_content.append(f"Table {i+1}:") - if table.rows: - # CSV format for tables - for row in table.rows: - row_data = [f'"{cell.text.strip()}"' for cell in row.cells] - table_content.append(",".join(row_data)) - table_content.append("") # Empty line between tables - - table_text = "\n".join(table_content) - contentItems.append(ContentItem( - label="tables", - data=table_text, - metadata=ContentMetadata( - size=len(table_text.encode('utf-8')), - pages=1, - mimeType="text/csv", - base64Encoded=False - ) - )) - - # Create separate content item for document structure - structure_info = [] - structure_info.append(f"Document Structure:") - structure_info.append(f"- Paragraphs: {len(doc.paragraphs)}") - structure_info.append(f"- Tables: {table_count}") - structure_info.append(f"- Sections: {len(doc.sections)}") - - # Count different paragraph styles - style_counts = {} - for para in doc.paragraphs: - style_name = para.style.name if para.style else "Normal" - style_counts[style_name] = style_counts.get(style_name, 0) + 1 - - for style, count in style_counts.items(): - structure_info.append(f"- {style}: {count}") - - structure_text = "\n".join(structure_info) - contentItems.append(ContentItem( - label="structure", - data=structure_text, - metadata=ContentMetadata( - size=len(structure_text.encode('utf-8')), - pages=1, - mimeType="text/plain", - base64Encoded=False - ) - )) - - return contentItems - - except Exception as e: - logger.error(f"Error processing Word document: {str(e)}") - raise FileProcessingError(f"Failed to process Word document: {str(e)}") - - async def _processXlsx(self, fileData: bytes, fileName: str, mimeType: str) -> List[ContentItem]: - """Process Excel document with enhanced table extraction and metadata""" - try: - self._loadOfficeExtractor() - if not officeExtractorLoaded: - raise FileProcessingError("Office extraction libraries not available") - - contentItems = [] - - with io.BytesIO(fileData) as xlsxStream: - try: - workbook = openpyxl.load_workbook(xlsxStream, data_only=True) - - except Exception as load_error: - logger.error(f"Failed to load Excel workbook {fileName}: {str(load_error)}") - raise FileProcessingError(f"Failed to load Excel workbook: {str(load_error)}") - - # Extract workbook properties safely - workbook_props = [] - try: - if hasattr(workbook, 'properties'): - props = workbook.properties - - - # Log all available attributes for debugging - for attr in dir(props): - if not attr.startswith('_'): # Skip private attributes - try: - value = getattr(props, attr) - if value is not None: - pass - except Exception as attr_error: - logger.debug(f"Could not read property {attr}: {str(attr_error)}") - - # Check each property safely before accessing - if hasattr(props, 'title') and props.title: - workbook_props.append(f"Title: {props.title}") - if hasattr(props, 'creator') and props.creator: # 'creator' is the correct attribute - workbook_props.append(f"Author: {props.creator}") - if hasattr(props, 'subject') and props.subject: - workbook_props.append(f"Subject: {props.subject}") - if hasattr(props, 'keywords') and props.keywords: - workbook_props.append(f"Keywords: {props.keywords}") - if hasattr(props, 'comments') and props.comments: - workbook_props.append(f"Comments: {props.comments}") - if hasattr(props, 'category') and props.category: - workbook_props.append(f"Category: {props.category}") - if hasattr(props, 'description') and props.description: - workbook_props.append(f"Description: {props.description}") - if hasattr(props, 'lastModifiedBy') and props.lastModifiedBy: - workbook_props.append(f"Last Modified By: {props.lastModifiedBy}") - if hasattr(props, 'created') and props.created: - workbook_props.append(f"Created: {props.created}") - if hasattr(props, 'modified') and props.modified: - workbook_props.append(f"Modified: {props.modified}") - - # Try alternative property names that might exist - if hasattr(props, 'author') and props.author: # Some versions use 'author' - workbook_props.append(f"Author (alt): {props.author}") - if hasattr(props, 'manager') and props.manager: - workbook_props.append(f"Manager: {props.manager}") - if hasattr(props, 'company') and props.company: - workbook_props.append(f"Company: {props.company}") - if hasattr(props, 'status') and props.status: - workbook_props.append(f"Status: {props.status}") - if hasattr(props, 'revision') and props.revision: - workbook_props.append(f"Revision: {props.revision}") - - else: - # Try to find properties in other locations - for attr in dir(workbook): - if not attr.startswith('_') and 'prop' in attr.lower(): - pass - except Exception as props_error: - logger.warning(f"Could not extract workbook properties: {str(props_error)}") - workbook_props = [] - - # Create workbook overview content item - overview_content = [] - overview_content.append("Excel Workbook Overview") - overview_content.append("=" * 30) - overview_content.append(f"Total Sheets: {len(workbook.sheetnames)}") - overview_content.append(f"Sheet Names: {', '.join(workbook.sheetnames)}") - - if workbook_props: - overview_content.append("\nWorkbook Properties:") - overview_content.extend(workbook_props) - - overview_text = "\n".join(overview_content) - contentItems.append(ContentItem( - label="overview", - data=overview_text, - metadata=ContentMetadata( - size=len(overview_text.encode('utf-8')), - pages=1, - mimeType="text/plain", - base64Encoded=False - ) - )) - - # Process each sheet - for sheetIndex, sheetName in enumerate(workbook.sheetnames): - try: - sheet = workbook[sheetName] - logger.debug(f"Processing sheet {sheetIndex + 1}: {sheetName}") - - # Get sheet metadata - sheet_metadata = [] - sheet_metadata.append(f"Sheet: {sheetName}") - - try: - sheet_metadata.append(f"Dimensions: {sheet.dimensions}") - sheet_metadata.append(f"Max Row: {sheet.max_row}") - sheet_metadata.append(f"Max Column: {sheet.max_column}") - except Exception as dim_error: - logger.warning(f"Could not get sheet dimensions for {sheetName}: {str(dim_error)}") - sheet_metadata.append("Dimensions: Unable to determine") - sheet_metadata.append("Max Row: Unknown") - sheet_metadata.append("Max Column: Unknown") - - # Check for sheet properties safely - try: - if hasattr(sheet, 'sheet_properties'): - sheet_props = sheet.sheet_properties - if hasattr(sheet_props, 'tabColor') and sheet_props.tabColor: - sheet_metadata.append(f"Tab Color: {sheet_props.tabColor}") - if hasattr(sheet_props, 'hidden') and sheet_props.hidden: - sheet_metadata.append("Hidden: Yes") - if hasattr(sheet_props, 'name') and sheet_props.name: - sheet_metadata.append(f"Internal Name: {sheet_props.name}") - except Exception as sheet_props_error: - logger.debug(f"Could not extract sheet properties for {sheetName}: {str(sheet_props_error)}") - - # Extract data from sheet - sheet_data = [] - - try: - # Find the actual data range (skip empty rows/columns) - min_row = sheet.min_row - max_row = sheet.max_row - min_col = sheet.min_column - max_col = sheet.max_column - - # Adjust for empty sheets - if max_row == 0 or max_col == 0: - sheet_metadata.append("Content: Empty sheet") - sheet_data.append("(Empty sheet)") - else: - # Extract all data with proper CSV formatting - for row_num in range(min_row, max_row + 1): - row_data = [] - for col_num in range(min_col, max_col + 1): - try: - cell = sheet.cell(row=row_num, column=col_num) - cell_value = cell.value - - # Handle different data types - if cell_value is None: - row_data.append("") - elif isinstance(cell_value, (int, float)): - row_data.append(str(cell_value)) - elif isinstance(cell_value, datetime): - row_data.append(cell_value.strftime("%Y-%m-%d %H:%M:%S")) - else: - # Escape quotes and wrap in quotes for CSV - cell_str = str(cell_value).replace('"', '""') - row_data.append(f'"{cell_str}"') - except Exception as cell_error: - logger.debug(f"Error processing cell at row {row_num}, col {col_num}: {str(cell_error)}") - row_data.append("(Error reading cell)") - - sheet_data.append(",".join(row_data)) - - sheet_metadata.append(f"Data Rows: {len(sheet_data)}") - sheet_metadata.append(f"Data Columns: {max_col - min_col + 1}") - except Exception as data_error: - logger.warning(f"Could not extract data from sheet {sheetName}: {str(data_error)}") - sheet_metadata.append("Content: Error extracting data") - sheet_data.append(f"(Error: {str(data_error)})") - - # Create sheet content item - sheet_content = "\n".join(sheet_metadata) + "\n\n" + "\n".join(sheet_data) - contentItems.append(ContentItem( - label=f"sheet_{sheetIndex + 1}_{sheetName}", - data=sheet_content, - metadata=ContentMetadata( - size=len(sheet_content.encode('utf-8')), - pages=1, - mimeType="text/csv", - base64Encoded=False - ) - )) - - # Create separate CSV file for each sheet (clean format) - if sheet_data and sheet_data[0].strip() and not sheet_data[0].startswith("(Error"): - # Create clean CSV without metadata - csv_content = "\n".join(sheet_data) - contentItems.append(ContentItem( - label=f"csv_{sheetIndex + 1}_{sheetName}", - data=csv_content, - metadata=ContentMetadata( - size=len(csv_content.encode('utf-8')), - pages=1, - mimeType="text/csv", - base64Encoded=False - ) - )) - - except Exception as sheet_error: - logger.error(f"Error processing sheet {sheetName}: {str(sheet_error)}") - # Create error content item for this sheet - error_content = f"Error processing sheet: {sheetName}\nError: {str(sheet_error)}" - contentItems.append(ContentItem( - label=f"error_sheet_{sheetIndex + 1}_{sheetName}", - data=error_content, - metadata=ContentMetadata( - size=len(error_content.encode('utf-8')), - pages=1, - mimeType="text/plain", - base64Encoded=False - ) - )) - - # Create summary content item - try: - summary_content = [] - summary_content.append("Excel Processing Summary") - summary_content.append("=" * 30) - summary_content.append(f"Total Sheets Processed: {len(workbook.sheetnames)}") - - total_rows = 0 - total_cells = 0 - for sheetName in workbook.sheetnames: - try: - sheet = workbook[sheetName] - if hasattr(sheet, 'max_row') and hasattr(sheet, 'max_column'): - if sheet.max_row > 0 and sheet.max_column > 0: - sheet_rows = sheet.max_row - sheet_cells = sheet.max_row * sheet.max_column - total_rows += sheet_rows - total_cells += sheet_cells - summary_content.append(f"- {sheetName}: {sheet_rows} rows, {sheet_cells} cells") - except Exception as summary_error: - logger.debug(f"Could not get summary for sheet {sheetName}: {str(summary_error)}") - summary_content.append(f"- {sheetName}: Error getting summary") - - summary_content.append(f"\nTotal Rows: {total_rows}") - summary_content.append(f"Total Cells: {total_cells}") - - summary_text = "\n".join(summary_content) - contentItems.append(ContentItem( - label="summary", - data=summary_text, - metadata=ContentMetadata( - size=len(summary_text.encode('utf-8')), - pages=1, - mimeType="text/plain", - base64Encoded=False - ) - )) - except Exception as summary_error: - logger.warning(f"Could not create summary: {str(summary_error)}") - - return contentItems - - except Exception as e: - logger.error(f"Error processing Excel document: {str(e)}") - raise FileProcessingError(f"Failed to process Excel document: {str(e)}") - - async def _processLegacyDoc(self, fileData: bytes, fileName: str, mimeType: str) -> List[ContentItem]: - """Process legacy Word .doc document""" - try: - # Try to use antiword or similar tools for .doc files - # For now, we'll provide a basic binary extraction with metadata - contentItems = [] - - # Create a basic content item explaining the limitation - info_content = f"""Legacy Word Document (.doc) - {fileName} - - Note: This is a legacy .doc format file. For better content extraction, - consider converting to .docx format. - - File size: {len(fileData)} bytes - Format: Microsoft Word 97-2003 Document - - Content extraction from .doc files requires specialized tools like: - - antiword (Linux/Unix) - - catdoc (Linux/Unix) - - Microsoft Word (for conversion) - - The raw binary content is available but not human-readable.""" - - contentItems.append(ContentItem( - label="info", - data=info_content, - metadata=ContentMetadata( - size=len(info_content.encode('utf-8')), - pages=1, - mimeType="text/plain", - base64Encoded=False - ) - )) - - # Also provide the binary content for potential processing - contentItems.append(ContentItem( - label="binary", - data=base64.b64encode(fileData).decode('utf-8'), - metadata=ContentMetadata( - size=len(fileData), - mimeType=mimeType, - base64Encoded=True - ) - )) - - return contentItems - - except Exception as e: - logger.error(f"Error processing legacy Word document: {str(e)}") - raise FileProcessingError(f"Failed to process legacy Word document: {str(e)}") - - async def _processLegacyXls(self, fileData: bytes, fileName: str, mimeType: str) -> List[ContentItem]: - """Process legacy Excel .xls document""" - try: - # Try to use xlrd or similar tools for .xls files - # For now, we'll provide a basic binary extraction with metadata - contentItems = [] - - # Create a basic content item explaining the limitation - info_content = f"""Legacy Excel Document (.xls) - {fileName} - - Note: This is a legacy .xls format file. For better content extraction, - consider converting to .xlsx format. - - File size: {len(fileData)} bytes - Format: Microsoft Excel 97-2003 Workbook - - Content extraction from .xls files requires specialized tools like: - - xlrd (Python library) - - Microsoft Excel (for conversion) - - LibreOffice (for conversion) - - The raw binary content is available but not human-readable.""" - - contentItems.append(ContentItem( - label="info", - data=info_content, - metadata=ContentMetadata( - size=len(info_content.encode('utf-8')), - pages=1, - mimeType="text/plain", - base64Encoded=False - ) - )) - - # Also provide the binary content for potential processing - contentItems.append(ContentItem( - label="binary", - data=base64.b64encode(fileData).decode('utf-8'), - metadata=ContentMetadata( - size=len(fileData), - mimeType=mimeType, - base64Encoded=True - ) - )) - - return contentItems - - except Exception as e: - logger.error(f"Error processing legacy Excel document: {str(e)}") - raise FileProcessingError(f"Failed to process legacy Excel document: {str(e)}") - - async def _processLegacyPpt(self, fileData: bytes, fileName: str, mimeType: str) -> List[ContentItem]: - """Process legacy PowerPoint .ppt document""" - try: - # Try to use python-pptx or similar tools for .ppt files - # For now, we'll provide a basic binary extraction with metadata - contentItems = [] - - # Create a basic content item explaining the limitation - info_content = f"""Legacy PowerPoint Document (.ppt) - {fileName} - - Note: This is a legacy .ppt format file. For better content extraction, - consider converting to .pptx format. - - File size: {len(fileData)} bytes - Format: Microsoft PowerPoint 97-2003 Presentation - - Content extraction from .ppt files requires specialized tools like: - - python-pptx (limited support for .ppt) - - Microsoft PowerPoint (for conversion) - - LibreOffice (for conversion) - - The raw binary content is available but not human-readable.""" - - contentItems.append(ContentItem( - label="info", - data=info_content, - metadata=ContentMetadata( - size=len(info_content.encode('utf-8')), - pages=1, - mimeType="text/plain", - base64Encoded=False - ) - )) - - # Also provide the binary content for potential processing - contentItems.append(ContentItem( - label="binary", - data=base64.b64encode(fileData).decode('utf-8'), - metadata=ContentMetadata( - size=len(fileData), - mimeType=mimeType, - base64Encoded=True - ) - )) - - return contentItems - - except Exception as e: - logger.error(f"Error processing legacy PowerPoint document: {str(e)}") - raise FileProcessingError(f"Failed to process legacy PowerPoint document: {str(e)}") - - async def _processPptx(self, fileData: bytes, fileName: str, mimeType: str) -> List[ContentItem]: - """Process PowerPoint document""" - try: - self._loadOfficeExtractor() - if not officeExtractorLoaded: - raise FileProcessingError("Office extraction libraries not available") - - contentItems = [] - - try: - # Try to use python-pptx for PowerPoint processing - from pptx import Presentation - - with io.BytesIO(fileData) as pptxStream: - prs = Presentation(pptxStream) - - for slideNum, slide in enumerate(prs.slides): - slideText = [] - - # Extract text from shapes - for shape in slide.shapes: - if hasattr(shape, "text") and shape.text: - slideText.append(shape.text) - - # Extract text from text boxes - for shape in slide.shapes: - if shape.has_text_frame: - for paragraph in shape.text_frame.paragraphs: - if paragraph.text: - slideText.append(paragraph.text) - - if slideText: - content = "\n".join(slideText) - contentItems.append(ContentItem( - label=f"slide_{slideNum + 1}", - data=content, - metadata=ContentMetadata( - size=len(content.encode('utf-8')), - pages=1, - mimeType="text/plain", - base64Encoded=False - ) - )) - - if not contentItems: - # Fallback: treat as binary if no text extracted - contentItems.append(ContentItem( - label="presentation", - data=base64.b64encode(fileData).decode('utf-8'), - metadata=ContentMetadata( - size=len(fileData), - pages=len(prs.slides) if hasattr(prs, 'slides') else 1, - mimeType="application/vnd.openxmlformats-officedocument.presentationml.presentation", - base64Encoded=True - ) - )) - - except ImportError: - # python-pptx not available, treat as binary - contentItems.append(ContentItem( - label="presentation", - data=base64.b64encode(fileData).decode('utf-8'), - metadata=ContentMetadata( - size=len(fileData), - pages=1, - mimeType="application/vnd.openxmlformats-officedocument.presentationml.presentation", - base64Encoded=True - ) - )) - - return contentItems - - except Exception as e: - logger.error(f"Error processing PowerPoint document: {str(e)}") - raise FileProcessingError(f"Failed to process PowerPoint document: {str(e)}") - - async def _processBinary(self, fileData: bytes, fileName: str, mimeType: str) -> List[ContentItem]: - """Process binary document""" - try: - return [ContentItem( - label="binary", - data=base64.b64encode(fileData).decode('utf-8'), - metadata=ContentMetadata( - size=len(fileData), - mimeType=mimeType, - base64Encoded=True, - error="Unsupported file type" - ) - )] - except Exception as e: - logger.error(f"Error processing binary document: {str(e)}") - raise FileProcessingError(f"Failed to process binary document: {str(e)}") - - async def _aiDataExtraction(self, contentItems: List[ContentItem], prompt: str) -> List[ContentItem]: - """ - Process content items with AI, handling chunking based on content type. - - Args: - contentItems: List of content items to process - prompt: Prompt for AI content extraction - - Returns: - List of processed content items - """ - processedItems = [] - - for item in contentItems: - try: - # Get content type from metadata - mimeType = item.metadata.mimeType if hasattr(item.metadata, 'mimeType') else "text/plain" - - - # Chunk content based on type - if mimeType.startswith('text/'): - chunks = self._chunkText(item.data, mimeType) - elif mimeType == "image/svg+xml": - # SVG files are XML, treat as text - chunks = self._chunkXml(item.data) - elif mimeType.startswith('image/'): - # Images should not be chunked - process as single unit - chunks = [item.data] - elif mimeType == "application/pdf": - chunks = self._chunkPdf(item.data) - elif mimeType == "application/vnd.openxmlformats-officedocument.wordprocessingml.document": - chunks = self._chunkDocx(item.data) - elif mimeType == "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet": - chunks = self._chunkXlsx(item.data) - elif mimeType.startswith('application/vnd.openxmlformats-officedocument.presentationml.presentation'): - chunks = self._chunkPptx(item.data) - elif mimeType.startswith('text/x-') or mimeType.startswith('application/') and any(keyword in mimeType for keyword in ['script', 'code', 'source', 'yaml', 'toml', 'dockerfile', 'makefile', 'cmake', 'gradle', 'maven']): - # Programming languages, configuration files, and build files - chunks = self._chunkCode(item.data) - else: - # Binary data - no chunking - chunks = [item.data] - - # Process each chunk - chunkResults = [] - for chunk in chunks: - # Process with AI based on content type - try: - if mimeType.startswith('image/') and mimeType != "image/svg+xml": - # For images (excluding SVG), analyze via centralized AI service - imagePrompt = f""" - Analyze this image and extract the actual content and information from it. - Focus on extracting text, data, charts, diagrams, or any meaningful content. - If there's text in the image, extract it. If there are charts or diagrams, describe the data. - Return the extracted content in a clear, structured text format. - - Original prompt: {prompt} - """ - image_doc = ChatDocument(fileData=chunk, fileName="image", mimeType=mimeType) - # Use direct import to avoid circular dependency - aiService = AiService(AiObjects()) - processedContent = await aiService.callAi( - prompt=imagePrompt, - documents=[image_doc], - options={ - "process_type": "image", - "operation_type": "analyse_content", - "priority": "balanced", - "compress_documents": True, - "max_cost": 0.03 - } - ) - else: - # For text content (including SVG), use text AI service - # Neutralize content if neutralizer is enabled (only for text) - contentToProcess = chunk - if self._neutralizer and contentToProcess: - contentToProcess = self._neutralizer.neutralize(contentToProcess) - - # Create AI prompt for text content - aiPrompt = f""" - Extract relevant information from this content based on the following prompt: - - PROMPT: {prompt} - - CONTENT: - {contentToProcess} - - Return ONLY the extracted information in a clear, concise format. - """ - - # Special handling for JavaScript and other code files - preserve complete content - if mimeType == "application/javascript" or mimeType == "application/typescript" or mimeType.startswith("text/x-") or any(keyword in mimeType for keyword in ['script', 'code', 'source']): - # For code files, preserve the complete content without AI processing - processedContent = contentToProcess - else: - # Use direct import to avoid circular dependency - aiService = AiService(AiObjects()) - processedContent = await aiService.callAi( - prompt=aiPrompt, - documents=None, - options={ - "process_type": "text", - "operation_type": "analyse_content", - "priority": "speed", - "compress_prompt": True, - "compress_documents": False, - "max_cost": 0.01, - "max_processing_time": 15 - } - ) - - chunkResults.append(processedContent) - except Exception as aiError: - logger.error(f"AI processing failed for chunk: {str(aiError)}") - # For non-text content, don't fallback to binary data - if mimeType.startswith('image/') or mimeType.startswith('video/') or mimeType.startswith('audio/'): - logger.warning(f"Skipping binary content fallback for {mimeType}") - continue # Skip this chunk entirely - else: - # Only fallback to original content for text-based formats - chunkResults.append(chunk) - - # Combine chunk results - if chunkResults: - # For text content, combine all chunks - if (mimeType.startswith('text/') or - mimeType in ["application/pdf", "application/vnd.openxmlformats-officedocument.wordprocessingml.document", - "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", - "application/vnd.openxmlformats-officedocument.presentationml.presentation"] or - mimeType.startswith('text/x-') or - mimeType.startswith('application/') and any(keyword in mimeType for keyword in ['script', 'code', 'source', 'yaml', 'toml', 'dockerfile', 'makefile', 'cmake', 'gradle', 'maven', 'javascript', 'typescript', 'sql', 'dart'])): - combinedResult = "\n".join(chunkResults) - else: - # For binary content, use the first result - combinedResult = chunkResults[0] - else: - # No chunks processed, use original content - combinedResult = item.data - - # Only add processed item if we have results - if combinedResult and combinedResult.strip(): - processedItems.append(ContentItem( - label=item.label, - data=combinedResult, - metadata=ContentMetadata( - size=len(combinedResult.encode('utf-8')), - pages=item.metadata.pages if hasattr(item.metadata, 'pages') else 1, - mimeType=item.metadata.mimeType if hasattr(item.metadata, 'mimeType') else "text/plain", - base64Encoded=item.metadata.base64Encoded if hasattr(item.metadata, 'base64Encoded') else False - ) - )) - else: - logger.warning(f"No processed content available for {item.label}, skipping item") - - except Exception as e: - logger.error(f"Error processing content chunk: {str(e)}") - # Add original content if processing fails - processedItems.append(item) - - return processedItems - - - - def _chunkText(self, content: str, mimeType: str) -> List[str]: - """Chunk text content based on mime type""" - if mimeType == "text/plain": - return self._chunkPlainText(content) - elif mimeType == "text/csv": - return self._chunkCsv(content) - elif mimeType == "application/json": - return self._chunkJson(content) - elif mimeType == "application/xml": - return self._chunkXml(content) - elif mimeType == "text/html": - return self._chunkHtml(content) - elif mimeType == "text/markdown" or mimeType == "text/x-rst" or mimeType == "text/x-wiki": - return self._chunkMarkdown(content) - elif mimeType == "application/javascript" or mimeType == "application/typescript": - # JavaScript and TypeScript files get special handling - return self._chunkJavaScript(content) - elif mimeType.startswith("text/x-") or mimeType.startswith("application/") and any(keyword in mimeType for keyword in ['script', 'code', 'source', 'yaml', 'toml', 'dockerfile', 'makefile', 'cmake', 'gradle', 'maven']): - # Programming languages, configuration files, and build files - return self._chunkCode(content) - elif mimeType == "application/vnd.openxmlformats-officedocument.wordprocessingml.document": - # Word documents with markdown formatting - return self._chunkWordDocument(content) - elif mimeType == "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet": - # Excel documents with structured data - return self._chunkExcelDocument(content) - else: - return self._chunkPlainText(content) - - def _chunkPlainText(self, content: str) -> List[str]: - """Chunk plain text content""" - chunks = [] - currentChunk = [] - currentSize = 0 - - for line in content.split('\n'): - lineSize = len(line.encode('utf-8')) - if currentSize + lineSize > self.chunkSizes["plain"]: - if currentChunk: - chunks.append('\n'.join(currentChunk)) - currentChunk = [line] - currentSize = lineSize - else: - currentChunk.append(line) - currentSize += lineSize - - if currentChunk: - chunks.append('\n'.join(currentChunk)) - - return chunks - - def _chunkCsv(self, content: str) -> List[str]: - """Chunk CSV content""" - chunks = [] - currentChunk = [] - currentSize = 0 - - for line in content.split('\n'): - lineSize = len(line.encode('utf-8')) - if currentSize + lineSize > self.chunkSizes["csv"]: - if currentChunk: - chunks.append('\n'.join(currentChunk)) - currentChunk = [line] - currentSize = lineSize - else: - currentChunk.append(line) - currentSize += lineSize - - if currentChunk: - chunks.append('\n'.join(currentChunk)) - - return chunks - - def _chunkJson(self, content: str) -> List[str]: - """Chunk JSON content""" - try: - data = json.loads(content) - chunks = [] - currentChunk = [] - currentSize = 0 - - def processValue(value, path=""): - nonlocal currentChunk, currentSize - valueStr = json.dumps({path: value}) if path else json.dumps(value) - valueSize = len(valueStr.encode('utf-8')) - - if currentSize + valueSize > self.chunkSizes["json"]: - if currentChunk: - chunks.append(json.dumps(currentChunk)) - currentChunk = [value] - currentSize = valueSize - else: - currentChunk.append(value) - currentSize += valueSize - - if isinstance(data, list): - for i, item in enumerate(data): - processValue(item, str(i)) - elif isinstance(data, dict): - for key, value in data.items(): - processValue(value, key) - else: - processValue(data) - - if currentChunk: - chunks.append(json.dumps(currentChunk)) - - return chunks - except json.JSONDecodeError: - return [content] - - def _chunkXml(self, content: str) -> List[str]: - """Chunk XML content""" - try: - root = ET.fromstring(content) - chunks = [] - currentChunk = [] - currentSize = 0 - - def processElement(element, path=""): - nonlocal currentChunk, currentSize - elementStr = ET.tostring(element, encoding='unicode') - elementSize = len(elementStr.encode('utf-8')) - - if currentSize + elementSize > self.chunkSizes["xml"]: - if currentChunk: - chunks.append(''.join(currentChunk)) - currentChunk = [elementStr] - currentSize = elementSize - else: - currentChunk.append(elementStr) - currentSize += elementSize - - for child in root: - processElement(child) - - if currentChunk: - chunks.append(''.join(currentChunk)) - - return chunks - except ET.ParseError: - return [content] - - def _chunkHtml(self, content: str) -> List[str]: - """Chunk HTML content with improved semantic chunking""" - try: - soup = BeautifulSoup(content, 'html.parser') - chunks = [] - currentChunk = [] - currentSize = 0 - - # Use smaller chunk size for HTML to avoid token limits - html_chunk_size = min(self.chunkSizes["html"], 15000) # Max 15KB per chunk - - def processElement(element): - nonlocal currentChunk, currentSize - elementStr = str(element) - elementSize = len(elementStr.encode('utf-8')) - - # If element is too large, split it - if elementSize > html_chunk_size: - # Split large elements by their content - if hasattr(element, 'get_text'): - text_content = element.get_text(separator='\n', strip=True) - if text_content: - # Split text content into smaller chunks - text_chunks = self._chunkTextBySize(text_content, html_chunk_size) - for text_chunk in text_chunks: - if currentChunk: - chunks.append(''.join(currentChunk)) - currentChunk = [f"<{element.name}>{text_chunk}"] - currentSize = len(currentChunk[0].encode('utf-8')) - else: - # For elements without text, just add them - if currentChunk: - chunks.append(''.join(currentChunk)) - currentChunk = [elementStr] - currentSize = elementSize - elif currentSize + elementSize > html_chunk_size: - if currentChunk: - chunks.append(''.join(currentChunk)) - currentChunk = [elementStr] - currentSize = elementSize - else: - currentChunk.append(elementStr) - currentSize += elementSize - - # Process elements in order of importance - for element in soup.find_all(['h1', 'h2', 'h3', 'h4', 'h5', 'h6']): - processElement(element) - - for element in soup.find_all(['p', 'div', 'section', 'article']): - processElement(element) - - for element in soup.find_all(['ul', 'ol', 'table']): - processElement(element) - - # Process remaining elements - for element in soup.find_all(): - if element.name not in ['h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'p', 'div', 'section', 'article', 'ul', 'ol', 'table']: - processElement(element) - - if currentChunk: - chunks.append(''.join(currentChunk)) - - return chunks - except Exception: - return [content] - - def _chunkTextBySize(self, text: str, max_size: int) -> List[str]: - """Helper method to chunk text by size""" - chunks = [] - current_chunk = "" - - for line in text.split('\n'): - line_size = len(line.encode('utf-8')) - if len(current_chunk.encode('utf-8')) + line_size > max_size: - if current_chunk: - chunks.append(current_chunk.strip()) - current_chunk = line - else: - current_chunk += "\n" + line if current_chunk else line - - if current_chunk: - chunks.append(current_chunk.strip()) - - return chunks - - def _chunkMarkdown(self, content: str) -> List[str]: - """Chunk Markdown content""" - chunks = [] - currentChunk = [] - currentSize = 0 - - # Split by headers, lists, and code blocks - # This is a simplified approach; a more robust solution would involve a proper Markdown parser - lines = content.split('\n') - for line in lines: - lineSize = len(line.encode('utf-8')) - if currentSize + lineSize > self.chunkSizes["text"]: # Use "text" chunk size for Markdown - if currentChunk: - chunks.append('\n'.join(currentChunk)) - currentChunk = [line] - currentSize = lineSize - else: - currentChunk.append(line) - currentSize += lineSize - - if currentChunk: - chunks.append('\n'.join(currentChunk)) - - return chunks - - def _chunkCode(self, content: str) -> List[str]: - """Chunk code content with optimized chunking for programming languages""" - chunks = [] - currentChunk = [] - currentSize = 0 - - # Use larger chunk size for code to minimize unnecessary splitting - # Code files often have long lines and complex structures - code_chunk_size = min(self.chunkSizes["code"], 80000) # Max 80KB per chunk for code - - # Split by lines to preserve code structure - lines = content.split('\n') - for line in lines: - lineSize = len(line.encode('utf-8')) - if currentSize + lineSize > code_chunk_size: - if currentChunk: - chunks.append('\n'.join(currentChunk)) - currentChunk = [line] - currentSize = lineSize - else: - currentChunk.append(line) - currentSize += lineSize - - if currentChunk: - chunks.append('\n'.join(currentChunk)) - - return chunks - - def _chunkJavaScript(self, content: str) -> List[str]: - """Chunk JavaScript content with optimized chunking for JavaScript files""" - chunks = [] - currentChunk = [] - currentSize = 0 - - # Use larger chunk size for JavaScript to minimize unnecessary splitting - # JavaScript files often have long lines and complex structures - js_chunk_size = min(self.chunkSizes["javascript"], 80000) # Max 80KB per chunk for JavaScript - - # Split by lines to preserve code structure - lines = content.split('\n') - for line in lines: - lineSize = len(line.encode('utf-8')) - if currentSize + lineSize > js_chunk_size: - if currentChunk: - chunks.append('\n'.join(currentChunk)) - currentChunk = [line] - currentSize = lineSize - else: - currentChunk.append(line) - currentSize += lineSize - - if currentChunk: - chunks.append('\n'.join(currentChunk)) - - return chunks - - def _chunkBinary(self, content: str) -> List[str]: - """Chunk binary content""" - try: - # Check if content is base64 encoded or plain text - try: - # Try to decode as base64 - binaryData = base64.b64decode(content) - # If successful, it's base64 - chunk the binary data - chunks = [] - chunkSize = self.chunkSizes["binary"] - - for i in range(0, len(binaryData), chunkSize): - chunk = binaryData[i:i + chunkSize] - chunks.append(base64.b64encode(chunk).decode('utf-8')) - - return chunks - except Exception: - # If base64 decoding fails, treat as text and chunk by lines - lines = content.split('\n') - chunks = [] - currentChunk = [] - currentSize = 0 - - for line in lines: - lineSize = len(line.encode('utf-8')) - if currentSize + lineSize > self.chunkSizes["binary"]: - if currentChunk: - chunks.append('\n'.join(currentChunk)) - currentChunk = [line] - currentSize = lineSize - else: - currentChunk.append(line) - currentSize += lineSize - - if currentChunk: - chunks.append('\n'.join(currentChunk)) - - return chunks - except Exception: - return [content] - - async def _chunkPdf(self, content: str) -> List[str]: - """Chunk PDF content""" - try: - # Content is already text from _processPdf, not base64 - # Split by lines to create chunks - lines = content.split('\n') - chunks = [] - currentChunk = [] - currentSize = 0 - - for line in lines: - lineSize = len(line.encode('utf-8')) - if currentSize + lineSize > self.chunkSizes["pdf"]: - if currentChunk: - chunks.append('\n'.join(currentChunk)) - currentChunk = [line] - currentSize = lineSize - else: - currentChunk.append(line) - currentSize += lineSize - - if currentChunk: - chunks.append('\n'.join(currentChunk)) - - return chunks - except Exception: - return [content] - - async def _chunkDocx(self, content: str) -> List[str]: - """Chunk Word document content""" - try: - # Content is already text from _processDocx, not base64 - # Split by lines to create chunks - lines = content.split('\n') - chunks = [] - currentChunk = [] - currentSize = 0 - - for line in lines: - lineSize = len(line.encode('utf-8')) - if currentSize + lineSize > self.chunkSizes["docx"]: - if currentChunk: - chunks.append('\n'.join(currentChunk)) - currentChunk = [line] - currentSize = lineSize - else: - currentChunk.append(line) - currentSize += lineSize - - if currentChunk: - chunks.append('\n'.join(currentChunk)) - - return chunks - except Exception: - return [content] - - async def _chunkXlsx(self, content: str) -> List[str]: - """Chunk Excel document content""" - try: - # Content is already text (CSV format) from _processXlsx, not base64 - # Split by lines to create chunks - lines = content.split('\n') - chunks = [] - currentChunk = [] - currentSize = 0 - - for line in lines: - lineSize = len(line.encode('utf-8')) - if currentSize + lineSize > self.chunkSizes["xlsx"]: - if currentChunk: - chunks.append('\n'.join(currentChunk)) - currentChunk = [line] - currentSize = lineSize - else: - currentChunk.append(line) - currentSize += lineSize - - if currentChunk: - chunks.append('\n'.join(currentChunk)) - - return chunks - except Exception: - return [content] - - async def _chunkPptx(self, content: str) -> List[str]: - """Chunk PowerPoint document content""" - try: - # Content is already text from PowerPoint processing, not base64 - # Split by lines to create chunks - lines = content.split('\n') - chunks = [] - currentChunk = [] - currentSize = 0 - - for line in lines: - lineSize = len(line.encode('utf-8')) - if currentSize + lineSize > self.chunkSizes["pptx"]: - if currentChunk: - chunks.append('\n'.join(currentChunk)) - currentChunk = [line] - currentSize = lineSize - else: - currentChunk.append(line) - currentSize += lineSize - - if currentChunk: - chunks.append('\n'.join(currentChunk)) - - return chunks - except Exception: - return [content] - - def _chunkWordDocument(self, content: str) -> List[str]: - """Chunk Word document content with markdown formatting preservation""" - chunks = [] - currentChunk = [] - currentSize = 0 - - # Use larger chunk size for Word documents to preserve formatting - word_chunk_size = min(self.chunkSizes["docx"], 60000) # Max 60KB per chunk - - # Split by lines to preserve document structure - lines = content.split('\n') - for line in lines: - lineSize = len(line.encode('utf-8')) - - # Check if adding this line would exceed chunk size - if currentSize + lineSize > word_chunk_size: - if currentChunk: - chunks.append('\n'.join(currentChunk)) - currentChunk = [line] - currentSize = lineSize - else: - currentChunk.append(line) - currentSize += lineSize - - # Add the last chunk if it exists - if currentChunk: - chunks.append('\n'.join(currentChunk)) - - return chunks - - def _chunkExcelDocument(self, content: str) -> List[str]: - """Chunk Excel document content with data structure preservation""" - chunks = [] - currentChunk = [] - currentSize = 0 - - # Use larger chunk size for Excel documents to preserve table structure - excel_chunk_size = min(self.chunkSizes["xlsx"], 80000) # Max 80KB per chunk - - # Split by lines to preserve CSV structure - lines = content.split('\n') - for line in lines: - lineSize = len(line.encode('utf-8')) - - # Check if adding this line would exceed chunk size - if currentSize + lineSize > excel_chunk_size: - if currentChunk: - chunks.append('\n'.join(currentChunk)) - currentChunk = [line] - currentSize = lineSize - else: - currentChunk.append(line) - currentSize += lineSize - - # Add the last chunk if it exists - if currentChunk: - chunks.append('\n'.join(currentChunk)) - - return chunks - - \ No newline at end of file diff --git a/modules/services/serviceExtraction/chunking/structure_chunker.py b/modules/services/serviceExtraction/chunking/structure_chunker.py index 921b3d4d..79ceb883 100644 --- a/modules/services/serviceExtraction/chunking/structure_chunker.py +++ b/modules/services/serviceExtraction/chunking/structure_chunker.py @@ -1,7 +1,7 @@ from typing import Any, Dict, List import json -from ..types import ContentPart +from modules.datamodels.datamodelExtraction import ContentPart from ..subRegistry import Chunker diff --git a/modules/services/serviceExtraction/chunking/table_chunker.py b/modules/services/serviceExtraction/chunking/table_chunker.py index 9a614896..f1f97411 100644 --- a/modules/services/serviceExtraction/chunking/table_chunker.py +++ b/modules/services/serviceExtraction/chunking/table_chunker.py @@ -1,6 +1,6 @@ from typing import Any, Dict, List -from ..types import ContentPart +from modules.datamodels.datamodelExtraction import ContentPart from ..subRegistry import Chunker diff --git a/modules/services/serviceExtraction/chunking/text_chunker.py b/modules/services/serviceExtraction/chunking/text_chunker.py index 69218837..35c75168 100644 --- a/modules/services/serviceExtraction/chunking/text_chunker.py +++ b/modules/services/serviceExtraction/chunking/text_chunker.py @@ -1,6 +1,6 @@ from typing import Any, Dict, List -from ..types import ContentPart +from modules.datamodels.datamodelExtraction import ContentPart from ..subRegistry import Chunker diff --git a/modules/services/serviceExtraction/mainServiceExtraction.py b/modules/services/serviceExtraction/mainServiceExtraction.py index 1f6f36e5..8ec970e8 100644 --- a/modules/services/serviceExtraction/mainServiceExtraction.py +++ b/modules/services/serviceExtraction/mainServiceExtraction.py @@ -1,4 +1,4 @@ -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List, Optional, Union import uuid from .subRegistry import ExtractorRegistry, ChunkerRegistry @@ -7,51 +7,50 @@ from modules.datamodels.datamodelExtraction import ExtractedContent, ContentPart class ExtractionService: - def __init__(self): + def __init__(self, services: Optional[Any] = None): + self.services = services self._extractorRegistry = ExtractorRegistry() self._chunkerRegistry = ChunkerRegistry() - def extractDocuments(self, documentList: List[Dict[str, Any]], options: Dict[str, Any]) -> Any: - processIndividually = options.get("processDocumentsIndividually", True) + def extractContent(self, documentList: List[Dict[str, Any]], options: Dict[str, Any]) -> List[ExtractedContent]: + results: List[ExtractedContent] = [] + for doc in documentList: + ec = runExtraction( + extractorRegistry=self._extractorRegistry, + chunkerRegistry=self._chunkerRegistry, + documentBytes=doc.get("bytes"), + fileName=doc.get("fileName"), + mimeType=doc.get("mimeType"), + options=options + ) + # Attach document id to parts if missing + for p in ec.parts: + if "documentId" not in p.metadata: + p.metadata["documentId"] = doc.get("id") or str(uuid.uuid4()) + ec = applyAiIfRequested(ec, options) + results.append(ec) + return results - if processIndividually: - results: List[ExtractedContent] = [] - for doc in documentList: - ec = runExtraction( - extractorRegistry=self._extractorRegistry, - chunkerRegistry=self._chunkerRegistry, - documentBytes=doc.get("bytes"), - fileName=doc.get("fileName"), - mimeType=doc.get("mimeType"), - options=options - ) - ec = applyAiIfRequested(ec, options) - results.append(ec) - return results - else: - allParts: List[ContentPart] = [] - for doc in documentList: - ec = runExtraction( - extractorRegistry=self._extractorRegistry, - chunkerRegistry=self._chunkerRegistry, - documentBytes=doc.get("bytes"), - fileName=doc.get("fileName"), - mimeType=doc.get("mimeType"), - options=options - ) - for p in ec.parts: - if "documentId" not in p.metadata: - p.metadata["documentId"] = doc.get("id") or str(uuid.uuid4()) - allParts.extend(ec.parts) + async def extractContentFromDocument(self, prompt: str, documents: List[Dict[str, Any]], options: Optional[Dict[str, Any]] = None) -> List[ExtractedContent]: + """ + Batch extract content from multiple documents. - pooled = poolAndLimit(allParts, self._chunkerRegistry, options) - # In pooled mode we return a dict containing pooled parts and an optional AI output - pooledResult: Dict[str, Any] = { - "parts": pooled, - "summary": {"documents": len(documentList)} - } - aiOut = applyAiIfRequested(ExtractedContent(id=str(uuid.uuid4()), parts=pooled, summary=None), options) - pooledResult["ai"] = aiOut.summary if isinstance(aiOut, ExtractedContent) else aiOut - return pooledResult + Args: + prompt: Instructional prompt for optional AI post-processing/selection. + documents: List of dicts with keys: id, bytes, fileName, mimeType. + options: Optional extraction options. "ai" config may be provided. + + Returns: + List[ExtractedContent]: one per input document in order. + """ + # Build options safely and inject prompt for downstream AI selection if desired + effectiveOptions: Dict[str, Any] = options.copy() if options else {} + aiCfg = effectiveOptions.get("ai") or {} + if prompt: + aiCfg["prompt"] = prompt + effectiveOptions["ai"] = aiCfg + + # Delegate to existing synchronous pipeline + return self.extractContent(documents, effectiveOptions) diff --git a/modules/services/serviceExtraction/subRegistry.py b/modules/services/serviceExtraction/subRegistry.py index ceb9b1b5..9c0adab5 100644 --- a/modules/services/serviceExtraction/subRegistry.py +++ b/modules/services/serviceExtraction/subRegistry.py @@ -1,6 +1,6 @@ from typing import Any, Dict, Optional -from .types import ContentPart +from modules.datamodels.datamodelExtraction import ContentPart class Extractor: diff --git a/modules/services/serviceDocument/mainServiceDocumentGeneration.py b/modules/services/serviceGeneration/mainServiceGeneration.py similarity index 59% rename from modules/services/serviceDocument/mainServiceDocumentGeneration.py rename to modules/services/serviceGeneration/mainServiceGeneration.py index 6c1b2d42..f18f071b 100644 --- a/modules/services/serviceDocument/mainServiceDocumentGeneration.py +++ b/modules/services/serviceGeneration/mainServiceGeneration.py @@ -1,9 +1,11 @@ import logging +import uuid from typing import Any, Dict, List, Optional from datetime import datetime, UTC import re from modules.shared.timezoneUtils import get_utc_timestamp -from modules.services.serviceDocument.subDocumentUtility import ( +from modules.datamodels.datamodelChat import ChatDocument +from modules.services.serviceGeneration.subDocumentUtility import ( getFileExtension, getMimeTypeFromExtension, detectMimeTypeFromContent, @@ -13,9 +15,13 @@ from modules.services.serviceDocument.subDocumentUtility import ( logger = logging.getLogger(__name__) -class DocumentGenerationService: - def __init__(self, service): - self.service = service +class GenerationService: + def __init__(self, serviceCenter=None): + # Directly use interfaces from the provided service center (no self.service calls) + self.serviceCenter = serviceCenter + self.interfaceDbComponent = getattr(serviceCenter, 'interfaceDbComponent', None) if serviceCenter else None + self.interfaceDbChat = getattr(serviceCenter, 'interfaceDbChat', None) if serviceCenter else None + self.workflow = getattr(serviceCenter, 'workflow', None) if serviceCenter else None def processActionResultDocuments(self, action_result, action, workflow) -> List[Dict[str, Any]]: """ @@ -53,7 +59,8 @@ class DocumentGenerationService: mime_type = doc.mimeType if mime_type == "application/octet-stream": content = doc.documentData - mime_type = detectMimeTypeFromContent(content, doc.documentName, self.service) + # Detect MIME without relying on a service center + mime_type = detectMimeTypeFromContent(content, doc.documentName) return { 'fileName': doc.documentName, @@ -98,8 +105,8 @@ class DocumentGenerationService: logger.info(f"Document {document_name} has content: {len(content)} characters") - # Create document with file in one step - document = self.service.createDocument( + # Create document with file in one step using interfaces directly + document = self._createDocument( fileName=document_name, mimeType=mime_type, content=content, @@ -126,9 +133,9 @@ class DocumentGenerationService: def _setDocumentWorkflowContext(self, document, action, workflow): """Set workflow context on a document for proper routing and labeling""" try: - # Get current workflow context from service center - workflow_context = self.service.getWorkflowContext() - workflow_stats = self.service.getWorkflowStats() + # Get current workflow context directly from workflow object + workflow_context = self._getWorkflowContext(workflow) + workflow_stats = self._getWorkflowStats(workflow) current_round = workflow_context.get('currentRound', 0) current_task = workflow_context.get('currentTask', 0) @@ -155,3 +162,99 @@ class DocumentGenerationService: except Exception as e: logger.warning(f"Could not set workflow context on document: {str(e)}") + + def _createDocument(self, fileName: str, mimeType: str, content: str, base64encoded: bool = True, messageId: str = None) -> Optional[ChatDocument]: + """Create file and ChatDocument using interfaces without service indirection.""" + try: + if not self.interfaceDbComponent: + logger.error("Component interface not available for document creation") + return None + # Convert content to bytes + if base64encoded: + import base64 + content_bytes = base64.b64decode(content) + else: + content_bytes = content.encode('utf-8') + # Create file and store data + file_item = self.interfaceDbComponent.createFile( + name=fileName, + mimeType=mimeType, + content=content_bytes + ) + self.interfaceDbComponent.createFileData(file_item.id, content_bytes) + # Collect file info + file_info = self._getFileInfo(file_item.id) + if not file_info: + logger.error(f"Could not get file info for fileId: {file_item.id}") + return None + # Build ChatDocument + document = ChatDocument( + id=str(uuid.uuid4()), + messageId=messageId or "", + fileId=file_item.id, + fileName=file_info.get("fileName", fileName), + fileSize=file_info.get("size", 0), + mimeType=file_info.get("mimeType", mimeType) + ) + # Ensure document can access component interface later + if hasattr(document, 'setComponentInterface') and self.interfaceDbComponent: + try: + document.setComponentInterface(self.interfaceDbComponent) + except Exception: + pass + return document + except Exception as e: + logger.error(f"Error creating document: {str(e)}") + return None + + def _getFileInfo(self, fileId: str) -> Optional[Dict[str, Any]]: + try: + if not self.interfaceDbComponent: + return None + file_item = self.interfaceDbComponent.getFile(fileId) + if file_item: + return { + "id": file_item.id, + "fileName": file_item.fileName, + "size": file_item.fileSize, + "mimeType": file_item.mimeType, + "fileHash": getattr(file_item, 'fileHash', None), + "creationDate": getattr(file_item, 'creationDate', None) + } + return None + except Exception as e: + logger.error(f"Error getting file info for {fileId}: {str(e)}") + return None + + def _getWorkflowContext(self, workflow) -> Dict[str, int]: + try: + return { + 'currentRound': getattr(workflow, 'currentRound', 0), + 'currentTask': getattr(workflow, 'currentTask', 0), + 'currentAction': getattr(workflow, 'currentAction', 0) + } + except Exception: + return {'currentRound': 0, 'currentTask': 0, 'currentAction': 0} + + def _getWorkflowStats(self, workflow) -> Dict[str, Any]: + try: + context = self._getWorkflowContext(workflow) + return { + 'currentRound': context['currentRound'], + 'currentTask': context['currentTask'], + 'currentAction': context['currentAction'], + 'totalTasks': getattr(workflow, 'totalTasks', 0), + 'totalActions': getattr(workflow, 'totalActions', 0), + 'workflowStatus': getattr(workflow, 'status', 'unknown'), + 'workflowId': getattr(workflow, 'id', 'unknown') + } + except Exception: + return { + 'currentRound': 0, + 'currentTask': 0, + 'currentAction': 0, + 'totalTasks': 0, + 'totalActions': 0, + 'workflowStatus': 'unknown', + 'workflowId': 'unknown' + } \ No newline at end of file diff --git a/modules/services/serviceDocument/subDocumentUtility.py b/modules/services/serviceGeneration/subDocumentUtility.py similarity index 100% rename from modules/services/serviceDocument/subDocumentUtility.py rename to modules/services/serviceGeneration/subDocumentUtility.py diff --git a/modules/services/serviceWorkflow/mainServiceWorkflow.py b/modules/services/serviceWorkflow/mainServiceWorkflow.py index 06dbfee5..7ab66872 100644 --- a/modules/services/serviceWorkflow/mainServiceWorkflow.py +++ b/modules/services/serviceWorkflow/mainServiceWorkflow.py @@ -4,8 +4,8 @@ from typing import Dict, Any, List, Optional from modules.datamodels.datamodelUam import User, UserConnection from modules.datamodels.datamodelChat import ChatDocument, ChatMessage from modules.datamodels.datamodelChat import ExtractedContent -from modules.services.serviceDocument.mainServiceDocumentExtraction import DocumentExtractionService -from modules.services.serviceDocument.subDocumentUtility import getFileExtension, getMimeTypeFromExtension, detectContentTypeFromData +from modules.services.serviceExtraction.mainServiceExtraction import ExtractionService +from modules.services.serviceGeneration.subDocumentUtility import getFileExtension, getMimeTypeFromExtension, detectContentTypeFromData from modules.shared.timezoneUtils import get_utc_timestamp from modules.services.serviceAi.mainServiceAi import AiService from modules.security.tokenManager import TokenManager @@ -312,80 +312,6 @@ class WorkflowService: def getFileData(self, fileId: str) -> bytes: """Get file data by ID""" return self.interfaceDbComponent.getFileData(fileId) - - async def extractContentFromDocument(self, prompt: str, document: ChatDocument) -> ExtractedContent: - """Extract content from ChatDocument using prompt""" - try: - # ChatDocument is just a reference, so we need to get file data using fileId - if not hasattr(document, 'fileId') or not document.fileId: - logger.error(f"Document {document.id} has no fileId") - raise ValueError("Document has no fileId") - - # Get file data from service center using document's fileId - fileData = self.getFileData(document.fileId) - if not fileData: - logger.error(f"No file data found for fileId: {document.fileId}") - raise ValueError("No file data found for document") - - # Get fileName and mime type from document properties - try: - fileName = document.fileName - mimeType = document.mimeType - except Exception as e: - # Try to diagnose and recover the issue - diagnosis = self._diagnoseDocumentAccess(document) - logger.error(f"Critical error: Cannot access document properties for document {document.id}. Diagnosis: {diagnosis}") - - # Attempt recovery - if self._recoverDocumentAccess(document): - try: - fileName = document.fileName - mimeType = document.mimeType - logger.info(f"Document access recovered for {document.id} - proceeding with AI extraction") - except Exception as recovery_error: - logger.error(f"Recovery failed for document {document.id}: {str(recovery_error)}") - raise RuntimeError(f"Document {document.id} properties are permanently inaccessible after recovery attempt - cannot proceed with AI extraction: {str(recovery_error)}") - else: - # Recovery failed - don't continue with invalid data - raise RuntimeError(f"Document {document.id} properties are inaccessible and recovery failed. Diagnosis: {diagnosis}") - - # Process with DocumentExtractionService directly (no circular dependency) - docService = DocumentExtractionService(None) # Pass None to avoid circular dependency - content_items = await docService.processFileData( - fileData=fileData, - fileName=fileName, - mimeType=mimeType, - base64Encoded=False, - prompt=prompt, - enableAI=True - ) - - # Convert ContentItem list to ExtractedContent - contents = [] - for item in content_items: - contents.append({ - 'label': item.label, - 'data': item.data, - 'metadata': { - 'mimeType': item.metadata.mimeType if hasattr(item.metadata, 'mimeType') else mimeType, - 'size': item.metadata.size if hasattr(item.metadata, 'size') else len(fileData), - 'base64Encoded': item.metadata.base64Encoded if hasattr(item.metadata, 'base64Encoded') else False - } - }) - - extractedContent = ExtractedContent( - id=document.id, - contents=contents - ) - - # Note: ExtractedContent model only has 'id' and 'contents' fields - # No need to set objectId or objectType as they don't exist in the model - - return extractedContent - - except Exception as e: - logger.error(f"Error extracting from document: {str(e)}") - raise def _diagnoseDocumentAccess(self, document: ChatDocument) -> Dict[str, Any]: """ @@ -456,43 +382,6 @@ class WorkflowService: logger.error(f"Error during document access recovery for {document.id}: {str(e)}") return False - def createDocument(self, fileName: str, mimeType: str, content: str, base64encoded: bool = True, messageId: str = None) -> ChatDocument: - """Create document with file in one step - handles file creation internally""" - # Convert content to bytes based on base64 flag - if base64encoded: - import base64 - content_bytes = base64.b64decode(content) - else: - content_bytes = content.encode('utf-8') - - # Create the file (hash and size are computed inside interfaceDbComponent) - file_item = self.interfaceDbComponent.createFile( - name=fileName, - mimeType=mimeType, - content=content_bytes - ) - - # Then store the file data - self.interfaceDbComponent.createFileData(file_item.id, content_bytes) - - # Get file info to copy attributes - file_info = self.getFileInfo(file_item.id) - if not file_info: - logger.error(f"Could not get file info for fileId: {file_item.id}") - raise ValueError(f"File info not found for fileId: {file_item.id}") - - # Create document with all file attributes copied - document = ChatDocument( - id=str(uuid.uuid4()), - messageId=messageId or "", # Use provided messageId or empty string as fallback - fileId=file_item.id, - fileName=file_info.get("fileName", fileName), - fileSize=file_info.get("size", 0), - mimeType=file_info.get("mimeType", mimeType) - ) - - return document - def calculateObjectSize(self, obj: Any) -> int: """ Calculate the size of an object in bytes. diff --git a/modules/services/test_all_services.py b/modules/services/test_all_services.py new file mode 100644 index 00000000..280fc7be --- /dev/null +++ b/modules/services/test_all_services.py @@ -0,0 +1,226 @@ +import asyncio +import os +import sys +from typing import List, Dict, Any + +# Ensure relative imports work when running directly +CURRENT_DIR = os.path.dirname(__file__) +GATEWAY_DIR = os.path.dirname(os.path.dirname(CURRENT_DIR)) +if GATEWAY_DIR not in sys.path: + sys.path.append(GATEWAY_DIR) + +from modules.services.serviceExtraction.mainServiceExtraction import ExtractionService +from modules.services.serviceGeneration.mainServiceGeneration import DocumentGenerationService +from modules.datamodels.datamodelWorkflow import ActionResult, ActionDocument +from modules.datamodels.datamodelAi import AiCallOptions, OperationType, ProcessingMode, Priority +from modules.services.serviceAi.mainServiceAi import AiService + + +TESTDATA_DIR = os.path.join(GATEWAY_DIR, "testdata") + + +def _read_test_files() -> List[Dict[str, Any]]: + files = [] + for name in os.listdir(TESTDATA_DIR): + path = os.path.join(TESTDATA_DIR, name) + if not os.path.isfile(path): + continue + try: + with open(path, "rb") as f: + data = f.read() + mime = _guess_mime(name) + files.append({ + "id": name, + "bytes": data, + "fileName": name, + "mimeType": mime, + }) + except Exception: + continue + return files + + +def _guess_mime(name: str) -> str: + lower = name.lower() + if lower.endswith(".pdf"): + return "application/pdf" + if lower.endswith(".xlsx"): + return "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" + if lower.endswith(".jpg") or lower.endswith(".jpeg"): + return "image/jpeg" + if lower.endswith(".png"): + return "image/png" + return "application/octet-stream" + + +def run_extraction_1000_bytes() -> None: + svc = ExtractionService() + docs = _read_test_files() + options = { + # cap total pooled size per document set + "maxSize": 1000, + # allow chunking to respect the cap across parts + "chunkAllowed": True, + # chunk sizes for different content types to help fit under the cap + "textChunkSize": 500, + "tableChunkSize": 500, + "structureChunkSize": 500, + # simple merge strategy if supported + "mergeStrategy": {}, + } + results = svc.extractContent(docs, options) + print("[extraction] documents:", len(docs), "results:", len(results)) + for i, ec in enumerate(results): + total = sum(int(p.metadata.get("size", 0) or 0) for p in ec.parts) + print(f" - doc[{i}] parts={len(ec.parts)} pooledBytes={total}") + + +async def main(): + print("=== serviceExtraction: compress to 1000 bytes ===") + run_extraction_1000_bytes() + print("\n=== serviceGeneration: create ActionResult and write output to testdata ===") + await run_generation_write_file() + print("\n=== serviceAi: planning call + image + pdf extraction ===") + await run_ai_tests() + + +if __name__ == "__main__": + asyncio.run(main()) + +async def run_generation_write_file() -> None: + # Minimal stubs for interfaces expected by DocumentGenerationService + class _FileItem: + def __init__(self, file_id: str, file_name: str, mime_type: str, content: bytes): + self.id = file_id + self.fileName = file_name + self.mimeType = mime_type + self.fileSize = len(content) + + class _ComponentInterface: + def __init__(self): + self._files = {} + def createFile(self, name: str, mimeType: str, content: bytes): + fid = f"test_{len(self._files)+1}" + item = _FileItem(fid, name, mimeType, content) + self._files[fid] = item + return item + def createFileData(self, fileId: str, content: bytes): + # Persist into testdata directory as requested + item = self._files[fileId] + out_path = os.path.join(TESTDATA_DIR, f"output_{fileId}_{item.fileName}") + with open(out_path, "wb") as f: + f.write(content) + def getFile(self, fileId: str): + return self._files.get(fileId) + + class _ServiceCenter: + def __init__(self, comp): + self.interfaceDbComponent = comp + self.interfaceDbChat = None + self.workflow = type("_Wf", (), {"id": "wf_test", "currentRound": 1, "currentTask": 1, "currentAction": 1, "status": "running", "totalTasks": 1, "totalActions": 1})() + + component = _ComponentInterface() + center = _ServiceCenter(component) + gen = DocumentGenerationService(center) + + # Build a fake action and ActionResult with a small text document + class _Action: + def __init__(self): + self.id = "action_test" + self.execMethod = "document" + self.execAction = "generate" + self.execParameters = {} + self.execResultLabel = "round1_task1_action1_results" + action = _Action() + + content = "This is a generated test file from serviceGeneration test." + action_doc = ActionDocument(documentName="test_generated.txt", documentData=content, mimeType="text/plain") + action_result = ActionResult(success=True, documents=[action_doc]) + + docs = gen.createDocumentsFromActionResult(action_result, action, center.workflow, message_id="msg_test") + print("[generation] created documents:", len(docs)) + + +async def run_ai_tests() -> None: + # Create AiService instance (uses internal default model registry; no external creds required for this test) + ai = await AiService.create() + + # Planning AI call (like in handlingTasks.generateTaskPlan) + plan_options = AiCallOptions( + operationType=OperationType.GENERATE_PLAN, + priority=Priority.QUALITY, + compressPrompt=False, + compressContext=False, + processingMode=ProcessingMode.DETAILED, + maxCost=0.05, + maxProcessingTime=10, + ) + plan_prompt = """ + You are a planning assistant. Return a compact JSON with fields: tasks:[{id, objective, success_criteria:[]}], languageUserDetected:"en". + Create exactly one simple task id:"task_1" objective:"Test planning" success_criteria:["done"]. + """.strip() + plan_resp = await ai.callAi(prompt=plan_prompt, placeholders=None, options=plan_options) + print("[ai] planning response length:", len(plan_resp) if plan_resp else 0) + + # Image content extraction prompt using test JPEG + img_path = os.path.join(TESTDATA_DIR, "00Untitled.jpg") + img_resp = None + if os.path.exists(img_path): + try: + with open(img_path, "rb") as f: + img_bytes = f.read() + img_options = AiCallOptions( + operationType=OperationType.ANALYSE_CONTENT, + priority=Priority.BALANCED, + compressPrompt=True, + compressContext=False, + processingMode=ProcessingMode.ADVANCED, + maxCost=0.02, + maxProcessingTime=10, + ) + img_resp = await ai.callAiImage( + prompt="Describe the content of this image succinctly.", + imageData=img_bytes, + mimeType="image/jpeg", + options=img_options, + ) + print("[ai] image analysis response length:", len(img_resp) if img_resp else 0) + except Exception as e: + print("[ai] image analysis error:", str(e)) + else: + print("[ai] image test file not found; skipping") + + # PDF extraction prompt: emulate text call with document context built via ExtractionService + pdf_path = os.path.join(TESTDATA_DIR, "diagramm_komponenten.pdf") + if os.path.exists(pdf_path): + try: + # Build a minimal ChatDocument-like shim that AiService._callAiText expects via extraction + class _Doc: + def __init__(self, file_path: str, mime: str): + self.id = "doc_pdf" + self.fileName = os.path.basename(file_path) + self.mimeType = mime + with open(file_path, "rb") as f: + self.fileData = f.read() + pdf_doc = _Doc(pdf_path, "application/pdf") + + pdf_options = AiCallOptions( + operationType=OperationType.ANALYSE_CONTENT, + priority=Priority.BALANCED, + compressPrompt=True, + compressContext=True, + processingMode=ProcessingMode.ADVANCED, + maxContextBytes=1000, + chunkAllowed=True, + maxCost=0.02, + maxProcessingTime=10, + ) + pdf_prompt = "Extract key information from the attached PDF." + pdf_resp = await ai.callAi(prompt=pdf_prompt, documents=[pdf_doc], options=pdf_options) + print("[ai] pdf extraction response length:", len(pdf_resp) if pdf_resp else 0) + except Exception as e: + print("[ai] pdf extraction error:", str(e)) + else: + print("[ai] pdf test file not found; skipping") + + diff --git a/modules/workflows/methods/methodAi.py b/modules/workflows/methods/methodAi.py index 033a89b6..bcc9412d 100644 --- a/modules/workflows/methods/methodAi.py +++ b/modules/workflows/methods/methodAi.py @@ -76,66 +76,70 @@ class MethodAi(MethodBase): chatDocuments = self.services.workflow.getChatDocumentsFromDocumentList(documentList) if chatDocuments: context_parts = [] + # Build batch payload for extraction + batch_docs = [] for doc in chatDocuments: - file_info = self.services.workflow.getFileInfo(doc.fileId) - try: - # Use the document content extraction service with the specific AI prompt context - # This tells the extraction engine exactly what and how to extract - extraction_prompt = f""" - Extract content from this document for AI processing context. - - AI Task: {aiPrompt} - Processing Mode: {processingMode} - Expected Output: {output_extension.upper()} format - - Requirements: - 1. Extract the most relevant text content that would be useful for the AI task - 2. Focus on content that directly relates to: {aiPrompt} - 3. Include key information, data, and insights that the AI needs - 4. Provide clean, readable text without formatting artifacts - - Document: {doc.fileName} - """ - - logger.debug(f"Extracting content from {doc.fileName} with task-specific prompt: {extraction_prompt[:100]}...") - - extracted_content = await self.services.documentExtraction.extractContentFromDocument( - prompt=extraction_prompt.strip(), - document=doc - ) - - if extracted_content and extracted_content.contents: - # Get the first content item's data - content = "" - for content_item in extracted_content.contents: - if hasattr(content_item, 'data') and content_item.data: - content += content_item.data + " " + fileBytes = self.services.workflow.getFileData(doc.fileId) if hasattr(doc, 'fileId') else None + except Exception: + fileBytes = None + batch_docs.append({ + "id": getattr(doc, 'id', None), + "bytes": fileBytes or b"", + "fileName": getattr(doc, 'fileName', 'unknown'), + "mimeType": getattr(doc, 'mimeType', None) or "application/octet-stream" + }) - - if content.strip(): - metadata_info = "" - if file_info and includeMetadata: - metadata_info = f" (Size: {file_info.get('fileSize', 'unknown')}, Type: {file_info.get('mimeType', 'unknown')})" - - # Adjust context length based on processing mode and AI task relevance - base_length = 5000 if processingMode == "detailed" else 3000 if processingMode == "advanced" else 2000 - - # For detailed mode, include more context - if processingMode == "detailed": - context_parts.append(f"Document: {doc.fileName}{metadata_info}\nRelevance to AI Task: This document contains content directly related to '{aiPrompt[:100]}...'\nContent:\n{content[:base_length]}...") - else: - context_parts.append(f"Document: {doc.fileName}{metadata_info}\nContent:\n{content[:base_length]}...") - else: - context_parts.append(f"Document: {doc.fileName} [No readable text content - binary file]") + extraction_prompt = ( + f"Extract content for AI task context. Task: {aiPrompt}. Mode: {processingMode}." + ) + try: + extracted_list = await self.services.extraction.extractContentFromDocuments( + prompt=extraction_prompt, + documents=batch_docs, + options={"ai": {"enabled": False}, "mergeStrategy": {}} + ) + except Exception: + extracted_list = [] + + # Helper to aggregate readable text from parts + def _partsToText(parts) -> str: + lines: List[str] = [] + for p in (parts or []): + try: + if getattr(p, 'typeGroup', '') in ("text", "table", "structure") and getattr(p, 'data', None): + lines.append(p.data) + except Exception: + continue + return "\n\n".join(lines) + + for i, doc in enumerate(chatDocuments): + file_info = self.services.workflow.getFileInfo(doc.fileId) + content = "" + try: + ec = extracted_list[i] if i < len(extracted_list) else None + if ec: + content = _partsToText(getattr(ec, 'parts', [])) + except Exception: + content = "" + + if content.strip(): + metadata_info = "" + if file_info and includeMetadata: + metadata_info = f" (Size: {file_info.get('fileSize', 'unknown')}, Type: {file_info.get('mimeType', 'unknown')})" + base_length = 5000 if processingMode == "detailed" else 3000 if processingMode == "advanced" else 2000 + if processingMode == "detailed": + context_parts.append( + f"Document: {doc.fileName}{metadata_info}\nRelevance to AI Task: This document contains content directly related to '{aiPrompt[:100]}...'\nContent:\n{content[:base_length]}..." + ) else: - context_parts.append(f"Document: {doc.fileName} [No readable text content - binary file]") - - except Exception as extract_error: - context_parts.append(f"Document: {doc.fileName} [Could not extract content - binary file]") - + context_parts.append( + f"Document: {doc.fileName}{metadata_info}\nContent:\n{content[:base_length]}..." + ) + else: + context_parts.append(f"Document: {doc.fileName} [No readable text content - binary file]") + if context_parts: - # Add a summary header to help the AI understand the context context_header = f""" === DOCUMENT CONTEXT FOR AI PROCESSING === AI Task: {aiPrompt[:100]}... @@ -147,7 +151,6 @@ class MethodAi(MethodBase): Use this information to provide the most accurate and helpful response. ================================================ """ - context = context_header + "\n\n" + "\n\n".join(context_parts) logger.info(f"Included {len(chatDocuments)} documents in AI context with task-specific extraction") diff --git a/modules/workflows/methods/methodDocument.py b/modules/workflows/methods/methodDocument.py index 61b4741b..06d5e52e 100644 --- a/modules/workflows/methods/methodDocument.py +++ b/modules/workflows/methods/methodDocument.py @@ -62,32 +62,36 @@ class MethodDocument(MethodBase): error="No documents found for the provided reference" ) - # Extract content from all documents using AI + # Batch extract content from all documents at once all_extracted_content = [] file_infos = [] - + batch_docs = [] for chatDocument in chatDocuments: file_info = self.services.workflow.getFileInfo(chatDocument.fileId) - + if includeMetadata: + file_infos.append(file_info) try: - # Use the document content extraction service with the specific AI prompt - # This handles all document types (text, binary, image, etc.) intelligently - extracted_content = await self.services.documentExtraction.extractContentFromDocument( - prompt=aiPrompt, - document=chatDocument - ) - - if extracted_content and extracted_content.contents: - all_extracted_content.append(extracted_content) - if includeMetadata: - file_infos.append(file_info) - logger.info(f"Successfully extracted content from {chatDocument.fileName}") - else: - logger.warning(f"No content extracted from {chatDocument.fileName}") - - except Exception as e: - logger.error(f"Error extracting content from {chatDocument.fileName}: {str(e)}") - continue + data = self.services.workflow.getFileData(chatDocument.fileId) if hasattr(chatDocument, 'fileId') else None + except Exception: + data = None + batch_docs.append({ + "id": getattr(chatDocument, 'id', None), + "bytes": data or b"", + "fileName": getattr(chatDocument, 'fileName', 'unknown'), + "mimeType": getattr(chatDocument, 'mimeType', None) or "application/octet-stream" + }) + + try: + extracted_list = await self.services.extraction.extractContentFromDocuments( + prompt=aiPrompt, + documents=batch_docs, + options={"ai": {"enabled": False}} + ) + except Exception as e: + logger.error(f"Batch extraction failed: {str(e)}") + extracted_list = [] + + all_extracted_content = extracted_list or [] if not all_extracted_content: return ActionResult.isFailure( @@ -97,20 +101,24 @@ class MethodDocument(MethodBase): # Process each document individually with its own format conversion output_documents = [] - for i, (chatDocument, extracted_content) in enumerate(zip(chatDocuments, all_extracted_content)): + for i, chatDocument in enumerate(chatDocuments): # Extract text content from this document text_content = "" - if hasattr(extracted_content, 'contents') and extracted_content.contents: - # Extract text from ContentItem objects - text_parts = [] - for content_item in extracted_content.contents: - if hasattr(content_item, 'data') and content_item.data: - text_parts.append(content_item.data) - text_content = "\n".join(text_parts) - elif isinstance(extracted_content, str): - text_content = extracted_content - else: - text_content = str(extracted_content) + try: + ec = all_extracted_content[i] if i < len(all_extracted_content) else None + if ec and hasattr(ec, 'parts'): + text_parts = [] + for part in getattr(ec, 'parts', []): + try: + if getattr(part, 'typeGroup', '') in ("text", "table", "structure") and getattr(part, 'data', None): + text_parts.append(part.data) + except Exception: + continue + text_content = "\n".join(text_parts) + else: + text_content = "" + except Exception: + text_content = "" # Get the expected format for this document (or use default) target_format = None @@ -692,27 +700,38 @@ class MethodDocument(MethodBase): content = "" logger.info(f"Processing document: type={type(doc)}") - # Get actual file content using the document content extraction service + # Batch extraction approach: prepare one doc payload and call extractor + try: try: - extracted_content = await self.services.documentExtraction.extractContentFromDocument( - prompt="Extract readable text content for HTML report generation", - document=doc - ) - - if extracted_content and extracted_content.contents: - # Get the first content item's data - for content_item in extracted_content.contents: - if hasattr(content_item, 'data') and content_item.data: - content += content_item.data + " " - - if content.strip(): - logger.info(f" Retrieved content from file: {len(content)} characters") - else: - logger.info(f" No readable text content found (binary file)") + data = self.services.workflow.getFileData(doc.fileId) if hasattr(doc, 'fileId') else None + except Exception: + data = None + extracted_list = await self.services.extraction.extractContentFromDocuments( + prompt="Extract readable text content for HTML report generation", + documents=[{ + "id": getattr(doc, 'id', None), + "bytes": data or b"", + "fileName": getattr(doc, 'fileName', 'unknown'), + "mimeType": getattr(doc, 'mimeType', None) or "application/octet-stream" + }], + options={"ai": {"enabled": False}} + ) + ec = extracted_list[0] if extracted_list else None + if ec and hasattr(ec, 'parts'): + for part in getattr(ec, 'parts', []): + try: + if getattr(part, 'typeGroup', '') in ("text", "table", "structure") and getattr(part, 'data', None): + content += part.data + " " + except Exception: + continue + if content.strip(): + logger.info(f" Retrieved content from file: {len(content)} characters") else: - logger.info(f" No content extracted (binary file)") - except Exception as e: - logger.info(f" Could not extract content (binary file): {str(e)}") + logger.info(f" No readable text content found (binary file)") + else: + logger.info(f" No content extracted (binary file)") + except Exception as e: + logger.info(f" Could not extract content (binary file): {str(e)}") # Skip empty documents if content and content.strip(): diff --git a/modules/workflows/methods/methodOutlook.py b/modules/workflows/methods/methodOutlook.py index 02fca00b..59616794 100644 --- a/modules/workflows/methods/methodOutlook.py +++ b/modules/workflows/methods/methodOutlook.py @@ -1392,45 +1392,53 @@ class MethodOutlook(MethodBase): composition_documents = [] if documentList: - try: - # Get document content from service center docs = self.services.workflow.getChatDocumentsFromDocumentList(documentList) if docs: + composition_documents.extend(docs) + # Batch extract summaries for AI context + batch_docs = [] for doc in docs: - composition_documents.append(doc) - - # Extract content for AI context using proper document service try: - if hasattr(doc, 'fileId') and doc.fileId: - # Use the document content extraction service instead of raw file reading - try: - extracted_content = await self.services.documentExtraction.extractContentFromDocument( - prompt="Extract readable text content for email composition", - document=doc - ) - - if extracted_content and extracted_content.contents: - # Get the first content item's data - content_text = "" - for content_item in extracted_content.contents: - if hasattr(content_item, 'data') and content_item.data: - content_text += content_item.data + " " + data = self.services.workflow.getFileData(doc.fileId) if hasattr(doc, 'fileId') else None + except Exception: + data = None + batch_docs.append({ + "id": getattr(doc, 'id', None), + "bytes": data or b"", + "fileName": getattr(doc, 'fileName', 'unknown'), + "mimeType": getattr(doc, 'mimeType', None) or "application/octet-stream" + }) - - if content_text.strip(): - # Truncate content for AI context (avoid token limits) - content_preview = content_text[:1000] + "..." if len(content_text) > 1000 else content_text - document_content_summary += f"\nDocument: {doc.fileName}\nContent Preview: {content_preview}\n" - # No content to extract - - except Exception as extract_error: - # Content extraction failed (normal for binary files) - pass - else: - logger.warning(f"Document {doc.fileName} has no fileId") - except Exception as e: - logger.warning(f"Error processing document {doc.fileName}: {str(e)}") + try: + extracted_list = await self.services.extraction.extractContentFromDocuments( + prompt="Extract readable text content for email composition", + documents=batch_docs, + options={"ai": {"enabled": False}} + ) + except Exception: + extracted_list = [] + + # Aggregate previews + def _partsToText(parts) -> str: + lines: List[str] = [] + for p in (parts or []): + try: + if getattr(p, 'typeGroup', '') in ("text", "table", "structure") and getattr(p, 'data', None): + lines.append(p.data) + except Exception: + continue + return "\n\n".join(lines) + + for i, doc in enumerate(docs): + try: + ec = extracted_list[i] if i < len(extracted_list) else None + content_text = _partsToText(getattr(ec, 'parts', [])) if ec else "" + if content_text.strip(): + content_preview = content_text[:1000] + "..." if len(content_text) > 1000 else content_text + document_content_summary += f"\nDocument: {doc.fileName}\nContent Preview: {content_preview}\n" + except Exception: + continue else: logger.warning("No documents found from documentList") except Exception as e: diff --git a/modules/workflows/methods/methodSharepoint.py b/modules/workflows/methods/methodSharepoint.py index f7b1202f..30c784bd 100644 --- a/modules/workflows/methods/methodSharepoint.py +++ b/modules/workflows/methods/methodSharepoint.py @@ -33,7 +33,7 @@ class MethodSharepoint(MethodBase): def _getMicrosoftConnection(self, connectionReference: str) -> Optional[Dict[str, Any]]: """Get Microsoft connection from connection reference and configure SharePoint service""" try: - userConnection = self.service.getUserConnectionFromConnectionReference(connectionReference) + userConnection = self.services.workflow.getUserConnectionFromConnectionReference(connectionReference) if not userConnection: logger.warning(f"No user connection found for reference: {connectionReference}") return None @@ -48,7 +48,7 @@ class MethodSharepoint(MethodBase): return None # Configure SharePoint service with the UserConnection - if not self.service.sharepoint.setAccessTokenFromConnection(userConnection): + if not self.services.sharepoint.setAccessTokenFromConnection(userConnection): logger.warning(f"Failed to configure SharePoint service with connection {userConnection.id}") return None @@ -363,11 +363,11 @@ class MethodSharepoint(MethodBase): async def _makeGraphApiCall(self, endpoint: str, method: str = "GET", data: bytes = None) -> Dict[str, Any]: """Make a Microsoft Graph API call with timeout and detailed logging""" try: - if not hasattr(self.service, 'sharepoint') or not self.service.sharepoint._target.access_token: + if not hasattr(self.services, 'sharepoint') or not self.services.sharepoint._target.access_token: return {"error": "SharePoint service not configured with access token"} headers = { - "Authorization": f"Bearer {self.service.sharepoint._target.access_token}", + "Authorization": f"Bearer {self.services.sharepoint._target.access_token}", "Content-Type": "application/json" if data and method != "PUT" else "application/octet-stream" if data else "application/json" } @@ -1014,7 +1014,7 @@ class MethodSharepoint(MethodBase): # For content download, we need to handle binary data try: async with aiohttp.ClientSession() as session: - headers = {"Authorization": f"Bearer {self.service.sharepoint._target.access_token}"} + headers = {"Authorization": f"Bearer {self.services.sharepoint._target.access_token}"} async with session.get(f"https://graph.microsoft.com/v1.0/{content_endpoint}", headers=headers) as response: if response.status == 200: content = await response.text() diff --git a/modules/workflows/processing/handlingTasks.py b/modules/workflows/processing/handlingTasks.py index fbd6163e..5b4f3f1a 100644 --- a/modules/workflows/processing/handlingTasks.py +++ b/modules/workflows/processing/handlingTasks.py @@ -16,7 +16,8 @@ from modules.datamodels.datamodelWorkflow import ( TaskResult, ReviewContext, TaskStatus, - ActionResult + ActionResult, + TaskAction ) from modules.datamodels.datamodelChat import ( WorkflowResult, @@ -47,7 +48,6 @@ from modules.workflows.processing.promptFactoryPlaceholders import ( extractUserLanguage, extractReviewContent ) -from modules.services.serviceDocument.mainServiceDocumentGeneration import DocumentGenerationService from modules.workflows.processing.promptFactory import methods from modules.workflows.processing.executionState import should_continue from modules.datamodels.datamodelAi import AiCallOptions, OperationType, ProcessingMode, Priority @@ -62,7 +62,6 @@ class HandlingTasks: def __init__(self, services, workflow=None): self.services = services self.workflow = workflow - self.documentGenerator = DocumentGenerationService(self.services.center) def _checkWorkflowStopped(self): """ @@ -71,7 +70,7 @@ class HandlingTasks: """ try: # Get the current workflow status from the database to avoid stale data - current_workflow = services.chatInterface.getWorkflow(self.service.workflow.id) + current_workflow = self.services.chatInterface.getWorkflow(self.workflow.id) if current_workflow and current_workflow.status == "stopped": logger.info("Workflow stopped by user, aborting execution") raise WorkflowStoppedException("Workflow was stopped by user") @@ -81,7 +80,7 @@ class HandlingTasks: except Exception as e: # If we can't get the current status due to other database issues, fall back to the in-memory object logger.warning(f"Could not check current workflow status from database: {str(e)}") - if self.service.workflow.status == "stopped": + if self.workflow and self.workflow.status == "stopped": logger.info("Workflow stopped by user (from in-memory object), aborting execution") raise WorkflowStoppedException("Workflow was stopped by user") @@ -137,7 +136,7 @@ class HandlingTasks: # Extract content for placeholders user_prompt = extractUserPrompt(task_planning_context) available_documents = extractAvailableDocuments(task_planning_context) - workflow_history = extractWorkflowHistory(self.service, task_planning_context) + workflow_history = extractWorkflowHistory(self.services, task_planning_context) # Create placeholders dictionary placeholders = { @@ -206,7 +205,7 @@ class HandlingTasks: # LANGUAGE DETECTION: Determine user language once for the entire workflow # Priority: 1. languageUserDetected from AI response, 2. service.user.language, 3. "en" detected_language = task_plan_dict.get('languageUserDetected', '').strip() - service_user_language = getattr(self.service.user, 'language', '') if self.service and self.service.user else '' + service_user_language = getattr(self.services.user, 'language', '') if self.services and self.services.user else '' if detected_language and len(detected_language) == 2: # Valid language code like "en", "de", "fr" user_language = detected_language @@ -219,8 +218,8 @@ class HandlingTasks: logger.info(f"Using default language: {user_language}") # Set the detected language in the service for use throughout the workflow - if self.service and self.service.user: - self.service.user.language = user_language + if self.services and self.services.user: + self.services.user.language = user_language logger.info(f"Set workflow user language to: {user_language}") tasks = [] @@ -414,9 +413,9 @@ class HandlingTasks: # Extract content for placeholders user_prompt = extractUserPrompt(action_context) available_documents = extractAvailableDocuments(action_context) - workflow_history = extractWorkflowHistory(self.service, action_context) - available_methods = extractAvailableMethods(self.service) - user_language = extractUserLanguage(self.service) + workflow_history = extractWorkflowHistory(self.services, action_context) + available_methods = extractAvailableMethods(self.services) + user_language = extractUserLanguage(self.services) # Create placeholders dictionary placeholders = { @@ -527,7 +526,7 @@ class HandlingTasks: # Extract content for placeholders user_prompt = extractUserPrompt(context) available_documents = extractAvailableDocuments(context) - user_language = extractUserLanguage(self.service) + user_language = extractUserLanguage(self.services) available_methods = extractAvailableMethods(self.service) # Create placeholders dictionary @@ -581,7 +580,7 @@ class HandlingTasks: method = action.get('method', '') name = action.get('name', '') action_signature = "" - if self.service and method in methods: + if self.services and method in methods: method_instance = methods[method]['instance'] action_signature = method_instance.getActionSignature(name) @@ -624,8 +623,8 @@ class HandlingTasks: parameters = param_obj.get('parameters', {}) if isinstance(param_obj, dict) else {} # Apply minimal defaults in-code (language) - if 'language' not in parameters and hasattr(self.service, 'user') and getattr(self.service.user, 'language', None): - parameters['language'] = self.service.user.language + if 'language' not in parameters and hasattr(self.services, 'user') and getattr(self.services.user, 'language', None): + parameters['language'] = self.services.user.language # Build a synthetic TaskAction for execution routing and labels current_round = getattr(self.workflow, 'currentRound', 0) @@ -718,7 +717,7 @@ class HandlingTasks: # Update workflow context for this task if task_index is not None: - self.service.setWorkflowContext(task_number=task_index) + self.services.setWorkflowContext(task_number=task_index) # Remove the increment call that causes double-increment bug # Create database log entry for task start in format expected by frontend @@ -765,7 +764,7 @@ class HandlingTasks: self._checkWorkflowStopped() # Update workflow[currentAction] for UI self.updateWorkflowBeforeExecutingAction(step) - self.service.setWorkflowContext(action_number=step) + self.services.setWorkflowContext(action_number=step) try: t0 = time.time() selection = await self.plan_select(context) @@ -864,7 +863,7 @@ class HandlingTasks: self.updateWorkflowBeforeExecutingAction(action_number) # Update workflow context for this action - self.service.setWorkflowContext(action_number=action_number) + self.services.setWorkflowContext(action_number=action_number) # Remove the increment call that causes double-increment bug # Log action start in format expected by frontend @@ -1483,7 +1482,7 @@ class HandlingTasks: message = await self.createActionMessage(action, result, workflow, message_result_label, [], task_step, task_index) if message: # Now create documents with the messageId - created_documents = self.documentGenerator.createDocumentsFromActionResult(result, action, workflow, message.id) + created_documents = self.services.generation.createDocumentsFromActionResult(result, action, workflow, message.id) # Update the message with the created documents if created_documents: message.documents = created_documents @@ -1562,8 +1561,8 @@ class HandlingTasks: logger.info(f"Result label: {result_label} - No documents") # Get current workflow context and stats - workflow_context = self.service.getWorkflowContext() - workflow_stats = self.service.getWorkflowStats() + workflow_context = self.services.getWorkflowContext() + workflow_stats = self.services.getWorkflowStats() # Create a more meaningful message that includes task context task_objective = task_step.objective if task_step else 'Unknown task' diff --git a/modules/workflows/processing/promptFactory.py b/modules/workflows/processing/promptFactory.py index 2184471d..dfd14147 100644 --- a/modules/workflows/processing/promptFactory.py +++ b/modules/workflows/processing/promptFactory.py @@ -9,7 +9,7 @@ import inspect from typing import Any, Dict, List from modules.datamodels.datamodelWorkflow import TaskContext, ReviewContext, DocumentExchange from modules.datamodels.datamodelChat import ChatDocument -from modules.services.serviceDocument.subDocumentUtility import getFileExtension +from modules.services.serviceGeneration.subDocumentUtility import getFileExtension from modules.workflows.methods.methodBase import MethodBase # Set up logger diff --git a/modules/workflows/workflowManager.py b/modules/workflows/workflowManager.py index 0e31d97f..a98faaf3 100644 --- a/modules/workflows/workflowManager.py +++ b/modules/workflows/workflowManager.py @@ -625,8 +625,8 @@ class WorkflowManager: documents = [] for fileId in fileIds: try: - # Get file info from service - fileInfo = self.handlingTasks.service.methodService.getFileInfo(fileId) + # Get file info from unified workflow service + fileInfo = self.services.workflow.getFileInfo(fileId) if fileInfo: # Create document directly with all file attributes document = ChatDocument( @@ -647,4 +647,4 @@ class WorkflowManager: def _setUserLanguage(self, language: str) -> None: """Set user language for the service center""" - self.handlingTasks.service.user.language = language + self.services.user.language = language diff --git a/testdata/00Untitled.jpg b/testdata/00Untitled.jpg new file mode 100644 index 00000000..bb59fb54 Binary files /dev/null and b/testdata/00Untitled.jpg differ diff --git a/testdata/Muster_Kundenliste_Test1.xlsx b/testdata/Muster_Kundenliste_Test1.xlsx new file mode 100644 index 00000000..5f21e9cf Binary files /dev/null and b/testdata/Muster_Kundenliste_Test1.xlsx differ diff --git a/testdata/diagramm_komponenten.pdf b/testdata/diagramm_komponenten.pdf new file mode 100644 index 00000000..45ef5c4b Binary files /dev/null and b/testdata/diagramm_komponenten.pdf differ