diff --git a/modules/interfaces/serviceChatModel.py b/modules/interfaces/serviceChatModel.py index 22a1f81f..fbf4ef30 100644 --- a/modules/interfaces/serviceChatModel.py +++ b/modules/interfaces/serviceChatModel.py @@ -88,6 +88,8 @@ class ContentItem(BaseModel, ModelMixin): """Individual content item from a document""" label: str = Field(description="Content label (e.g., tab name, tag name)") data: str = Field(description="Extracted text content") + mimeType: str = Field(description="MIME type of the content") + base64Encoded: bool = Field(description="Whether the data is base64 encoded") metadata: ContentMetadata = Field(description="Content metadata") # Register labels for ContentItem diff --git a/modules/workflow/documentService.py b/modules/workflow/documentService.py deleted file mode 100644 index 79e215c9..00000000 --- a/modules/workflow/documentService.py +++ /dev/null @@ -1,106 +0,0 @@ -""" -Document Manager Module for handling document operations and content extraction. -""" - -import base64 -import logging -from typing import List, Optional, Dict, Any, Union -from pathlib import Path -import uuid - -from modules.interfaces.serviceChatModel import ( - ChatDocument, - TaskDocument, - ExtractedContent, - ContentItem, - ContentMetadata -) -from modules.workflow.serviceContainer import ServiceContainer -from modules.workflow.processorDocument import DocumentProcessor - -logger = logging.getLogger(__name__) - -class DocumentManager: - """Manager for document operations and content extraction""" - - def __init__(self, serviceContainer: ServiceContainer): - self.service = serviceContainer - self._processor = DocumentProcessor() - - async def extractFromChatDocument(self, prompt: str, document: ChatDocument) -> ExtractedContent: - """ - Extract content from a ChatDocument with AI processing. - - Args: - prompt: Prompt for AI content extraction - document: The ChatDocument to process - - Returns: - ExtractedContent containing the processed content - """ - # Convert ChatDocument to TaskDocument - taskDoc = await self._convertToTaskDocument(document) - - # Process document using processor - extractedContent = await self._processor.processDocument(taskDoc, prompt) - - # Update the objectId and objectType to reference the original ChatDocument - extractedContent.objectId = document.id - extractedContent.objectType = "ChatDocument" - - return extractedContent - - async def extractFromTaskDocument(self, prompt: str, document: TaskDocument) -> ExtractedContent: - """ - Extract content directly from a task document. - - Args: - prompt: The prompt to use for content extraction - document: The task document to extract content from - - Returns: - ExtractedContent containing the processed content - - Raises: - ValueError: If document is invalid - IOError: If file cannot be read - """ - try: - return await self._processor.processDocument(document, prompt) - except Exception as e: - logger.error(f"Error extracting from task document: {str(e)}") - raise - - async def _convertToTaskDocument(self, chatDoc: ChatDocument) -> TaskDocument: - """ - Convert a ChatDocument to a TaskDocument. - - Args: - chatDoc: The chat document to convert - - Returns: - TaskDocument containing the converted data - - Raises: - ValueError: If document is invalid - IOError: If file cannot be read - """ - try: - # Get file content - fileContent = await self.service.functions.getFileData(chatDoc.fileId) - if not fileContent: - raise ValueError(f"Could not get content for file {chatDoc.fileId}") - - # Convert to base64 - base64Data = base64.b64encode(fileContent).decode('utf-8') - - return TaskDocument( - id=str(uuid.uuid4()), - filename=chatDoc.filename, - fileSize=chatDoc.fileSize, - mimeType=chatDoc.mimeType, - data=base64Data - ) - except Exception as e: - logger.error(f"Error converting chat document to task document: {str(e)}") - raise diff --git a/modules/workflow/processorDocument.py b/modules/workflow/processorDocument.py index 4a72f9ea..51b02c67 100644 --- a/modules/workflow/processorDocument.py +++ b/modules/workflow/processorDocument.py @@ -6,6 +6,8 @@ import io import base64 from datetime import datetime, UTC from pathlib import Path +import xml.etree.ElementTree as ET +from bs4 import BeautifulSoup from modules.interfaces.serviceChatModel import ( ChatDocument, @@ -35,8 +37,11 @@ class DocumentProcessor: def __init__(self, currentUser: Optional[User] = None): """Initialize the document processor.""" + self.serviceManagement = getInterface(currentUser) + self._neutralizer = DataAnonymizer() if APP_CONFIG.get("ENABLE_CONTENT_NEUTRALIZATION", False) else None + self.supportedTypes: Dict[str, Callable[[Union[ChatDocument, TaskDocument]], Awaitable[List[ContentItem]]]] = { 'text/plain': self._processText, 'text/csv': self._processCsv, @@ -51,6 +56,22 @@ class DocumentProcessor: 'application/vnd.openxmlformats-officedocument.wordprocessingml.document': self._processDocx, 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet': self._processXlsx } + + self.chunkSizes = { + "text": 40000, # General text content + "plain": 40000, # Plain text + "csv": 40000, # CSV data + "json": 40000, # JSON data + "xml": 40000, # XML data + "html": 40000, # HTML content + "image": 1024 * 1024, # 1MB for images + "video": 5 * 1024 * 1024, # 5MB for video chunks + "binary": 1024 * 1024, # 1MB for binary data + "pdf": 40000, # PDF text content + "docx": 40000, # Word document text + "xlsx": 40000, # Excel data + "svg": 40000 # SVG content + } def initialize(self) -> None: """Initialize the document processor.""" @@ -127,37 +148,7 @@ class DocumentProcessor: if prompt and contentItems: try: # Process each content item with AI - processedItems = [] - for item in contentItems: - # Neutralize content if neutralizer is enabled - contentToProcess = item.data - if self._neutralizer and contentToProcess: - contentToProcess = self._neutralizer.neutralize(contentToProcess) - - # Create AI prompt for this content - aiPrompt = f""" - Extract relevant information from this content based on the following prompt: - - PROMPT: {prompt} - - CONTENT: - {contentToProcess} - - Return ONLY the extracted information in a clear, concise format. - """ - - # Get AI response - response = await self.serviceManagement.callAi([ - {"role": "system", "content": "You are an expert at extracting relevant information from documents."}, - {"role": "user", "content": aiPrompt} - ]) - - # Update content with AI processed data - processedItems.append(ContentItem( - label=item.label, - data=response.strip(), - metadata=item.metadata - )) + processedItems = await self._aiDataExtraction(contentItems, prompt) contentItems = processedItems @@ -235,7 +226,9 @@ class DocumentProcessor: data=content, metadata=ContentMetadata( size=len(content.encode('utf-8')), - pages=1 + pages=1, + mimeType="text/plain", + base64Encoded=False ) )] except Exception as e: @@ -258,7 +251,9 @@ class DocumentProcessor: data=content, metadata=ContentMetadata( size=len(content.encode('utf-8')), - pages=1 + pages=1, + mimeType="text/csv", + base64Encoded=False ) )] except Exception as e: @@ -284,7 +279,9 @@ class DocumentProcessor: data=content, metadata=ContentMetadata( size=len(content.encode('utf-8')), - pages=1 + pages=1, + mimeType="application/json", + base64Encoded=False ) )] except Exception as e: @@ -307,7 +304,9 @@ class DocumentProcessor: data=content, metadata=ContentMetadata( size=len(content.encode('utf-8')), - pages=1 + pages=1, + mimeType="application/xml", + base64Encoded=False ) )] except Exception as e: @@ -330,7 +329,9 @@ class DocumentProcessor: data=content, metadata=ContentMetadata( size=len(content.encode('utf-8')), - pages=1 + pages=1, + mimeType="text/html", + base64Encoded=False ) )] except Exception as e: @@ -356,6 +357,8 @@ class DocumentProcessor: data=content if isSvg else None, metadata=ContentMetadata( size=len(content.encode('utf-8')), + mimeType="image/svg+xml", + base64Encoded=False, error=None if isSvg else "Invalid SVG content" ) )] @@ -383,7 +386,9 @@ class DocumentProcessor: size=len(fileData), width=img.width, height=img.height, - colorMode=img.mode + colorMode=img.mode, + mimeType=document.mimeType, + base64Encoded=True ) # Convert image to base64 for storage @@ -420,7 +425,9 @@ class DocumentProcessor: pdfReader = PyPDF2.PdfReader(pdfStream) metadata = ContentMetadata( size=len(fileData), - pages=len(pdfReader.pages) + pages=len(pdfReader.pages), + mimeType="application/pdf", + base64Encoded=False ) # Extract text from all pages @@ -433,7 +440,9 @@ class DocumentProcessor: data=pageText, metadata=ContentMetadata( size=len(pageText.encode('utf-8')), - pages=1 + pages=1, + mimeType="text/plain", + base64Encoded=False ) )) @@ -456,7 +465,9 @@ class DocumentProcessor: data=base64.b64encode(imageBytes).decode('utf-8'), metadata=ContentMetadata( size=len(imageBytes), - pages=1 + pages=1, + mimeType=f"image/{imageExt}", + base64Encoded=True ) )) except Exception as imgE: @@ -506,7 +517,9 @@ class DocumentProcessor: data=content, metadata=ContentMetadata( size=len(content.encode('utf-8')), - pages=len(doc.paragraphs) + pages=len(doc.paragraphs), + mimeType="text/plain", + base64Encoded=False ) )] except Exception as e: @@ -551,7 +564,9 @@ class DocumentProcessor: data=content, metadata=ContentMetadata( size=len(content.encode('utf-8')), - pages=1 + pages=1, + mimeType="text/csv", + base64Encoded=False ) )) @@ -575,6 +590,8 @@ class DocumentProcessor: data=base64.b64encode(fileData).decode('utf-8'), metadata=ContentMetadata( size=len(fileData), + mimeType=document.mimeType, + base64Encoded=True, error="Unsupported file type" ) )] @@ -582,6 +599,291 @@ class DocumentProcessor: logger.error(f"Error processing binary document: {str(e)}") raise FileProcessingError(f"Failed to process binary document: {str(e)}") + async def _aiDataExtraction(self, contentItems: List[ContentItem], prompt: str) -> List[ContentItem]: + """ + Process content items with AI, handling chunking based on content type. + + Args: + contentItems: List of content items to process + prompt: Prompt for AI content extraction + + Returns: + List of processed content items + """ + processedItems = [] + + for item in contentItems: + try: + # Get content type from metadata + mimeType = item.metadata.mimeType if hasattr(item.metadata, 'mimeType') else "text/plain" + + # Chunk content based on type + if mimeType.startswith('text/'): + chunks = await self._chunkText(item.data, mimeType) + elif mimeType.startswith('image/'): + chunks = await self._chunkImage(item.data) + elif mimeType.startswith('video/'): + chunks = await self._chunkVideo(item.data) + else: + # Binary data - no chunking + chunks = [item.data] + + # Process each chunk + chunkResults = [] + for chunk in chunks: + # Neutralize content if neutralizer is enabled + contentToProcess = chunk + if self._neutralizer and contentToProcess: + contentToProcess = self._neutralizer.neutralize(contentToProcess) + + # Create AI prompt for this chunk + aiPrompt = f""" + Extract relevant information from this content based on the following prompt: + + PROMPT: {prompt} + + CONTENT: + {contentToProcess} + + Return ONLY the extracted information in a clear, concise format. + """ + + # Get AI response + response = await self.serviceManagement.callAi([ + {"role": "system", "content": "You are an expert at extracting relevant information from documents."}, + {"role": "user", "content": aiPrompt} + ]) + + chunkResults.append(response.strip()) + + # Combine chunk results + combinedResult = "\n".join(chunkResults) + + # Update content with AI processed data + processedItems.append(ContentItem( + label=item.label, + data=combinedResult, + metadata=ContentMetadata( + size=len(combinedResult.encode('utf-8')), + pages=1, + mimeType="text/plain", + base64Encoded=False + ) + )) + + except Exception as e: + logger.error(f"Error processing content chunk: {str(e)}") + # Add original content if processing fails + processedItems.append(item) + + return processedItems + + def _chunkText(self, content: str, mimeType: str) -> List[str]: + """Chunk text content based on mime type""" + if mimeType == "text/plain": + return self._chunkPlainText(content) + elif mimeType == "text/csv": + return self._chunkCsv(content) + elif mimeType == "application/json": + return self._chunkJson(content) + elif mimeType == "application/xml": + return self._chunkXml(content) + elif mimeType == "text/html": + return self._chunkHtml(content) + else: + return self._chunkPlainText(content) + + def _chunkPlainText(self, content: str) -> List[str]: + """Chunk plain text content""" + chunks = [] + currentChunk = [] + currentSize = 0 + + for line in content.split('\n'): + lineSize = len(line.encode('utf-8')) + if currentSize + lineSize > self.chunkSizes["plain"]: + if currentChunk: + chunks.append('\n'.join(currentChunk)) + currentChunk = [line] + currentSize = lineSize + else: + currentChunk.append(line) + currentSize += lineSize + + if currentChunk: + chunks.append('\n'.join(currentChunk)) + + return chunks + + def _chunkCsv(self, content: str) -> List[str]: + """Chunk CSV content""" + chunks = [] + currentChunk = [] + currentSize = 0 + + for line in content.split('\n'): + lineSize = len(line.encode('utf-8')) + if currentSize + lineSize > self.chunkSizes["csv"]: + if currentChunk: + chunks.append('\n'.join(currentChunk)) + currentChunk = [line] + currentSize = lineSize + else: + currentChunk.append(line) + currentSize += lineSize + + if currentChunk: + chunks.append('\n'.join(currentChunk)) + + return chunks + + def _chunkJson(self, content: str) -> List[str]: + """Chunk JSON content""" + try: + data = json.loads(content) + chunks = [] + currentChunk = [] + currentSize = 0 + + def processValue(value, path=""): + nonlocal currentChunk, currentSize + valueStr = json.dumps({path: value}) if path else json.dumps(value) + valueSize = len(valueStr.encode('utf-8')) + + if currentSize + valueSize > self.chunkSizes["json"]: + if currentChunk: + chunks.append(json.dumps(currentChunk)) + currentChunk = [value] + currentSize = valueSize + else: + currentChunk.append(value) + currentSize += valueSize + + if isinstance(data, list): + for i, item in enumerate(data): + processValue(item, str(i)) + elif isinstance(data, dict): + for key, value in data.items(): + processValue(value, key) + else: + processValue(data) + + if currentChunk: + chunks.append(json.dumps(currentChunk)) + + return chunks + except json.JSONDecodeError: + return [content] + + def _chunkXml(self, content: str) -> List[str]: + """Chunk XML content""" + try: + root = ET.fromstring(content) + chunks = [] + currentChunk = [] + currentSize = 0 + + def processElement(element, path=""): + nonlocal currentChunk, currentSize + elementStr = ET.tostring(element, encoding='unicode') + elementSize = len(elementStr.encode('utf-8')) + + if currentSize + elementSize > self.chunkSizes["xml"]: + if currentChunk: + chunks.append(''.join(currentChunk)) + currentChunk = [elementStr] + currentSize = elementSize + else: + currentChunk.append(elementStr) + currentSize += elementSize + + for child in root: + processElement(child) + + if currentChunk: + chunks.append(''.join(currentChunk)) + + return chunks + except ET.ParseError: + return [content] + + def _chunkHtml(self, content: str) -> List[str]: + """Chunk HTML content""" + try: + soup = BeautifulSoup(content, 'html.parser') + chunks = [] + currentChunk = [] + currentSize = 0 + + def processElement(element): + nonlocal currentChunk, currentSize + elementStr = str(element) + elementSize = len(elementStr.encode('utf-8')) + + if currentSize + elementSize > self.chunkSizes["html"]: + if currentChunk: + chunks.append(''.join(currentChunk)) + currentChunk = [elementStr] + currentSize = elementSize + else: + currentChunk.append(elementStr) + currentSize += elementSize + + for element in soup.find_all(['p', 'div', 'section', 'article']): + processElement(element) + + if currentChunk: + chunks.append(''.join(currentChunk)) + + return chunks + except Exception: + return [content] + + def _chunkImage(self, content: str) -> List[str]: + """Chunk image content""" + try: + imageData = base64.b64decode(content) + chunks = [] + chunkSize = self.chunkSizes["image"] + + for i in range(0, len(imageData), chunkSize): + chunk = imageData[i:i + chunkSize] + chunks.append(base64.b64encode(chunk).decode('utf-8')) + + return chunks + except Exception: + return [content] + + def _chunkVideo(self, content: str) -> List[str]: + """Chunk video content""" + try: + videoData = base64.b64decode(content) + chunks = [] + chunkSize = self.chunkSizes["video"] + + for i in range(0, len(videoData), chunkSize): + chunk = videoData[i:i + chunkSize] + chunks.append(base64.b64encode(chunk).decode('utf-8')) + + return chunks + except Exception: + return [content] + + def _chunkBinary(self, content: str) -> List[str]: + """Chunk binary content""" + try: + binaryData = base64.b64decode(content) + chunks = [] + chunkSize = self.chunkSizes["binary"] + + for i in range(0, len(binaryData), chunkSize): + chunk = binaryData[i:i + chunkSize] + chunks.append(base64.b64encode(chunk).decode('utf-8')) + + return chunks + except Exception: + return [content] + async def _extractText(self, content: bytes, mimeType: str) -> str: """Extract text content from various text formats""" try: