From 2f7d73f2ce71062fcf85700317c3045492710dda Mon Sep 17 00:00:00 2001
From: ValueOn AG
Date: Tue, 10 Jun 2025 18:43:00 +0200
Subject: [PATCH] unlimited file size and normalizer initially included
---
modules/interfaces/serviceChatModel.py | 2 +
modules/workflow/documentService.py | 106 -------
modules/workflow/processorDocument.py | 386 ++++++++++++++++++++++---
3 files changed, 346 insertions(+), 148 deletions(-)
delete mode 100644 modules/workflow/documentService.py
diff --git a/modules/interfaces/serviceChatModel.py b/modules/interfaces/serviceChatModel.py
index 22a1f81f..fbf4ef30 100644
--- a/modules/interfaces/serviceChatModel.py
+++ b/modules/interfaces/serviceChatModel.py
@@ -88,6 +88,8 @@ class ContentItem(BaseModel, ModelMixin):
"""Individual content item from a document"""
label: str = Field(description="Content label (e.g., tab name, tag name)")
data: str = Field(description="Extracted text content")
+ mimeType: str = Field(description="MIME type of the content")
+ base64Encoded: bool = Field(description="Whether the data is base64 encoded")
metadata: ContentMetadata = Field(description="Content metadata")
# Register labels for ContentItem
diff --git a/modules/workflow/documentService.py b/modules/workflow/documentService.py
deleted file mode 100644
index 79e215c9..00000000
--- a/modules/workflow/documentService.py
+++ /dev/null
@@ -1,106 +0,0 @@
-"""
-Document Manager Module for handling document operations and content extraction.
-"""
-
-import base64
-import logging
-from typing import List, Optional, Dict, Any, Union
-from pathlib import Path
-import uuid
-
-from modules.interfaces.serviceChatModel import (
- ChatDocument,
- TaskDocument,
- ExtractedContent,
- ContentItem,
- ContentMetadata
-)
-from modules.workflow.serviceContainer import ServiceContainer
-from modules.workflow.processorDocument import DocumentProcessor
-
-logger = logging.getLogger(__name__)
-
-class DocumentManager:
- """Manager for document operations and content extraction"""
-
- def __init__(self, serviceContainer: ServiceContainer):
- self.service = serviceContainer
- self._processor = DocumentProcessor()
-
- async def extractFromChatDocument(self, prompt: str, document: ChatDocument) -> ExtractedContent:
- """
- Extract content from a ChatDocument with AI processing.
-
- Args:
- prompt: Prompt for AI content extraction
- document: The ChatDocument to process
-
- Returns:
- ExtractedContent containing the processed content
- """
- # Convert ChatDocument to TaskDocument
- taskDoc = await self._convertToTaskDocument(document)
-
- # Process document using processor
- extractedContent = await self._processor.processDocument(taskDoc, prompt)
-
- # Update the objectId and objectType to reference the original ChatDocument
- extractedContent.objectId = document.id
- extractedContent.objectType = "ChatDocument"
-
- return extractedContent
-
- async def extractFromTaskDocument(self, prompt: str, document: TaskDocument) -> ExtractedContent:
- """
- Extract content directly from a task document.
-
- Args:
- prompt: The prompt to use for content extraction
- document: The task document to extract content from
-
- Returns:
- ExtractedContent containing the processed content
-
- Raises:
- ValueError: If document is invalid
- IOError: If file cannot be read
- """
- try:
- return await self._processor.processDocument(document, prompt)
- except Exception as e:
- logger.error(f"Error extracting from task document: {str(e)}")
- raise
-
- async def _convertToTaskDocument(self, chatDoc: ChatDocument) -> TaskDocument:
- """
- Convert a ChatDocument to a TaskDocument.
-
- Args:
- chatDoc: The chat document to convert
-
- Returns:
- TaskDocument containing the converted data
-
- Raises:
- ValueError: If document is invalid
- IOError: If file cannot be read
- """
- try:
- # Get file content
- fileContent = await self.service.functions.getFileData(chatDoc.fileId)
- if not fileContent:
- raise ValueError(f"Could not get content for file {chatDoc.fileId}")
-
- # Convert to base64
- base64Data = base64.b64encode(fileContent).decode('utf-8')
-
- return TaskDocument(
- id=str(uuid.uuid4()),
- filename=chatDoc.filename,
- fileSize=chatDoc.fileSize,
- mimeType=chatDoc.mimeType,
- data=base64Data
- )
- except Exception as e:
- logger.error(f"Error converting chat document to task document: {str(e)}")
- raise
diff --git a/modules/workflow/processorDocument.py b/modules/workflow/processorDocument.py
index 4a72f9ea..51b02c67 100644
--- a/modules/workflow/processorDocument.py
+++ b/modules/workflow/processorDocument.py
@@ -6,6 +6,8 @@ import io
import base64
from datetime import datetime, UTC
from pathlib import Path
+import xml.etree.ElementTree as ET
+from bs4 import BeautifulSoup
from modules.interfaces.serviceChatModel import (
ChatDocument,
@@ -35,8 +37,11 @@ class DocumentProcessor:
def __init__(self, currentUser: Optional[User] = None):
"""Initialize the document processor."""
+
self.serviceManagement = getInterface(currentUser)
+
self._neutralizer = DataAnonymizer() if APP_CONFIG.get("ENABLE_CONTENT_NEUTRALIZATION", False) else None
+
self.supportedTypes: Dict[str, Callable[[Union[ChatDocument, TaskDocument]], Awaitable[List[ContentItem]]]] = {
'text/plain': self._processText,
'text/csv': self._processCsv,
@@ -51,6 +56,22 @@ class DocumentProcessor:
'application/vnd.openxmlformats-officedocument.wordprocessingml.document': self._processDocx,
'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet': self._processXlsx
}
+
+ self.chunkSizes = {
+ "text": 40000, # General text content
+ "plain": 40000, # Plain text
+ "csv": 40000, # CSV data
+ "json": 40000, # JSON data
+ "xml": 40000, # XML data
+ "html": 40000, # HTML content
+ "image": 1024 * 1024, # 1MB for images
+ "video": 5 * 1024 * 1024, # 5MB for video chunks
+ "binary": 1024 * 1024, # 1MB for binary data
+ "pdf": 40000, # PDF text content
+ "docx": 40000, # Word document text
+ "xlsx": 40000, # Excel data
+ "svg": 40000 # SVG content
+ }
def initialize(self) -> None:
"""Initialize the document processor."""
@@ -127,37 +148,7 @@ class DocumentProcessor:
if prompt and contentItems:
try:
# Process each content item with AI
- processedItems = []
- for item in contentItems:
- # Neutralize content if neutralizer is enabled
- contentToProcess = item.data
- if self._neutralizer and contentToProcess:
- contentToProcess = self._neutralizer.neutralize(contentToProcess)
-
- # Create AI prompt for this content
- aiPrompt = f"""
- Extract relevant information from this content based on the following prompt:
-
- PROMPT: {prompt}
-
- CONTENT:
- {contentToProcess}
-
- Return ONLY the extracted information in a clear, concise format.
- """
-
- # Get AI response
- response = await self.serviceManagement.callAi([
- {"role": "system", "content": "You are an expert at extracting relevant information from documents."},
- {"role": "user", "content": aiPrompt}
- ])
-
- # Update content with AI processed data
- processedItems.append(ContentItem(
- label=item.label,
- data=response.strip(),
- metadata=item.metadata
- ))
+ processedItems = await self._aiDataExtraction(contentItems, prompt)
contentItems = processedItems
@@ -235,7 +226,9 @@ class DocumentProcessor:
data=content,
metadata=ContentMetadata(
size=len(content.encode('utf-8')),
- pages=1
+ pages=1,
+ mimeType="text/plain",
+ base64Encoded=False
)
)]
except Exception as e:
@@ -258,7 +251,9 @@ class DocumentProcessor:
data=content,
metadata=ContentMetadata(
size=len(content.encode('utf-8')),
- pages=1
+ pages=1,
+ mimeType="text/csv",
+ base64Encoded=False
)
)]
except Exception as e:
@@ -284,7 +279,9 @@ class DocumentProcessor:
data=content,
metadata=ContentMetadata(
size=len(content.encode('utf-8')),
- pages=1
+ pages=1,
+ mimeType="application/json",
+ base64Encoded=False
)
)]
except Exception as e:
@@ -307,7 +304,9 @@ class DocumentProcessor:
data=content,
metadata=ContentMetadata(
size=len(content.encode('utf-8')),
- pages=1
+ pages=1,
+ mimeType="application/xml",
+ base64Encoded=False
)
)]
except Exception as e:
@@ -330,7 +329,9 @@ class DocumentProcessor:
data=content,
metadata=ContentMetadata(
size=len(content.encode('utf-8')),
- pages=1
+ pages=1,
+ mimeType="text/html",
+ base64Encoded=False
)
)]
except Exception as e:
@@ -356,6 +357,8 @@ class DocumentProcessor:
data=content if isSvg else None,
metadata=ContentMetadata(
size=len(content.encode('utf-8')),
+ mimeType="image/svg+xml",
+ base64Encoded=False,
error=None if isSvg else "Invalid SVG content"
)
)]
@@ -383,7 +386,9 @@ class DocumentProcessor:
size=len(fileData),
width=img.width,
height=img.height,
- colorMode=img.mode
+ colorMode=img.mode,
+ mimeType=document.mimeType,
+ base64Encoded=True
)
# Convert image to base64 for storage
@@ -420,7 +425,9 @@ class DocumentProcessor:
pdfReader = PyPDF2.PdfReader(pdfStream)
metadata = ContentMetadata(
size=len(fileData),
- pages=len(pdfReader.pages)
+ pages=len(pdfReader.pages),
+ mimeType="application/pdf",
+ base64Encoded=False
)
# Extract text from all pages
@@ -433,7 +440,9 @@ class DocumentProcessor:
data=pageText,
metadata=ContentMetadata(
size=len(pageText.encode('utf-8')),
- pages=1
+ pages=1,
+ mimeType="text/plain",
+ base64Encoded=False
)
))
@@ -456,7 +465,9 @@ class DocumentProcessor:
data=base64.b64encode(imageBytes).decode('utf-8'),
metadata=ContentMetadata(
size=len(imageBytes),
- pages=1
+ pages=1,
+ mimeType=f"image/{imageExt}",
+ base64Encoded=True
)
))
except Exception as imgE:
@@ -506,7 +517,9 @@ class DocumentProcessor:
data=content,
metadata=ContentMetadata(
size=len(content.encode('utf-8')),
- pages=len(doc.paragraphs)
+ pages=len(doc.paragraphs),
+ mimeType="text/plain",
+ base64Encoded=False
)
)]
except Exception as e:
@@ -551,7 +564,9 @@ class DocumentProcessor:
data=content,
metadata=ContentMetadata(
size=len(content.encode('utf-8')),
- pages=1
+ pages=1,
+ mimeType="text/csv",
+ base64Encoded=False
)
))
@@ -575,6 +590,8 @@ class DocumentProcessor:
data=base64.b64encode(fileData).decode('utf-8'),
metadata=ContentMetadata(
size=len(fileData),
+ mimeType=document.mimeType,
+ base64Encoded=True,
error="Unsupported file type"
)
)]
@@ -582,6 +599,291 @@ class DocumentProcessor:
logger.error(f"Error processing binary document: {str(e)}")
raise FileProcessingError(f"Failed to process binary document: {str(e)}")
+ async def _aiDataExtraction(self, contentItems: List[ContentItem], prompt: str) -> List[ContentItem]:
+ """
+ Process content items with AI, handling chunking based on content type.
+
+ Args:
+ contentItems: List of content items to process
+ prompt: Prompt for AI content extraction
+
+ Returns:
+ List of processed content items
+ """
+ processedItems = []
+
+ for item in contentItems:
+ try:
+ # Get content type from metadata
+ mimeType = item.metadata.mimeType if hasattr(item.metadata, 'mimeType') else "text/plain"
+
+ # Chunk content based on type
+ if mimeType.startswith('text/'):
+ chunks = await self._chunkText(item.data, mimeType)
+ elif mimeType.startswith('image/'):
+ chunks = await self._chunkImage(item.data)
+ elif mimeType.startswith('video/'):
+ chunks = await self._chunkVideo(item.data)
+ else:
+ # Binary data - no chunking
+ chunks = [item.data]
+
+ # Process each chunk
+ chunkResults = []
+ for chunk in chunks:
+ # Neutralize content if neutralizer is enabled
+ contentToProcess = chunk
+ if self._neutralizer and contentToProcess:
+ contentToProcess = self._neutralizer.neutralize(contentToProcess)
+
+ # Create AI prompt for this chunk
+ aiPrompt = f"""
+ Extract relevant information from this content based on the following prompt:
+
+ PROMPT: {prompt}
+
+ CONTENT:
+ {contentToProcess}
+
+ Return ONLY the extracted information in a clear, concise format.
+ """
+
+ # Get AI response
+ response = await self.serviceManagement.callAi([
+ {"role": "system", "content": "You are an expert at extracting relevant information from documents."},
+ {"role": "user", "content": aiPrompt}
+ ])
+
+ chunkResults.append(response.strip())
+
+ # Combine chunk results
+ combinedResult = "\n".join(chunkResults)
+
+ # Update content with AI processed data
+ processedItems.append(ContentItem(
+ label=item.label,
+ data=combinedResult,
+ metadata=ContentMetadata(
+ size=len(combinedResult.encode('utf-8')),
+ pages=1,
+ mimeType="text/plain",
+ base64Encoded=False
+ )
+ ))
+
+ except Exception as e:
+ logger.error(f"Error processing content chunk: {str(e)}")
+ # Add original content if processing fails
+ processedItems.append(item)
+
+ return processedItems
+
+ def _chunkText(self, content: str, mimeType: str) -> List[str]:
+ """Chunk text content based on mime type"""
+ if mimeType == "text/plain":
+ return self._chunkPlainText(content)
+ elif mimeType == "text/csv":
+ return self._chunkCsv(content)
+ elif mimeType == "application/json":
+ return self._chunkJson(content)
+ elif mimeType == "application/xml":
+ return self._chunkXml(content)
+ elif mimeType == "text/html":
+ return self._chunkHtml(content)
+ else:
+ return self._chunkPlainText(content)
+
+ def _chunkPlainText(self, content: str) -> List[str]:
+ """Chunk plain text content"""
+ chunks = []
+ currentChunk = []
+ currentSize = 0
+
+ for line in content.split('\n'):
+ lineSize = len(line.encode('utf-8'))
+ if currentSize + lineSize > self.chunkSizes["plain"]:
+ if currentChunk:
+ chunks.append('\n'.join(currentChunk))
+ currentChunk = [line]
+ currentSize = lineSize
+ else:
+ currentChunk.append(line)
+ currentSize += lineSize
+
+ if currentChunk:
+ chunks.append('\n'.join(currentChunk))
+
+ return chunks
+
+ def _chunkCsv(self, content: str) -> List[str]:
+ """Chunk CSV content"""
+ chunks = []
+ currentChunk = []
+ currentSize = 0
+
+ for line in content.split('\n'):
+ lineSize = len(line.encode('utf-8'))
+ if currentSize + lineSize > self.chunkSizes["csv"]:
+ if currentChunk:
+ chunks.append('\n'.join(currentChunk))
+ currentChunk = [line]
+ currentSize = lineSize
+ else:
+ currentChunk.append(line)
+ currentSize += lineSize
+
+ if currentChunk:
+ chunks.append('\n'.join(currentChunk))
+
+ return chunks
+
+ def _chunkJson(self, content: str) -> List[str]:
+ """Chunk JSON content"""
+ try:
+ data = json.loads(content)
+ chunks = []
+ currentChunk = []
+ currentSize = 0
+
+ def processValue(value, path=""):
+ nonlocal currentChunk, currentSize
+ valueStr = json.dumps({path: value}) if path else json.dumps(value)
+ valueSize = len(valueStr.encode('utf-8'))
+
+ if currentSize + valueSize > self.chunkSizes["json"]:
+ if currentChunk:
+ chunks.append(json.dumps(currentChunk))
+ currentChunk = [value]
+ currentSize = valueSize
+ else:
+ currentChunk.append(value)
+ currentSize += valueSize
+
+ if isinstance(data, list):
+ for i, item in enumerate(data):
+ processValue(item, str(i))
+ elif isinstance(data, dict):
+ for key, value in data.items():
+ processValue(value, key)
+ else:
+ processValue(data)
+
+ if currentChunk:
+ chunks.append(json.dumps(currentChunk))
+
+ return chunks
+ except json.JSONDecodeError:
+ return [content]
+
+ def _chunkXml(self, content: str) -> List[str]:
+ """Chunk XML content"""
+ try:
+ root = ET.fromstring(content)
+ chunks = []
+ currentChunk = []
+ currentSize = 0
+
+ def processElement(element, path=""):
+ nonlocal currentChunk, currentSize
+ elementStr = ET.tostring(element, encoding='unicode')
+ elementSize = len(elementStr.encode('utf-8'))
+
+ if currentSize + elementSize > self.chunkSizes["xml"]:
+ if currentChunk:
+ chunks.append(''.join(currentChunk))
+ currentChunk = [elementStr]
+ currentSize = elementSize
+ else:
+ currentChunk.append(elementStr)
+ currentSize += elementSize
+
+ for child in root:
+ processElement(child)
+
+ if currentChunk:
+ chunks.append(''.join(currentChunk))
+
+ return chunks
+ except ET.ParseError:
+ return [content]
+
+ def _chunkHtml(self, content: str) -> List[str]:
+ """Chunk HTML content"""
+ try:
+ soup = BeautifulSoup(content, 'html.parser')
+ chunks = []
+ currentChunk = []
+ currentSize = 0
+
+ def processElement(element):
+ nonlocal currentChunk, currentSize
+ elementStr = str(element)
+ elementSize = len(elementStr.encode('utf-8'))
+
+ if currentSize + elementSize > self.chunkSizes["html"]:
+ if currentChunk:
+ chunks.append(''.join(currentChunk))
+ currentChunk = [elementStr]
+ currentSize = elementSize
+ else:
+ currentChunk.append(elementStr)
+ currentSize += elementSize
+
+ for element in soup.find_all(['p', 'div', 'section', 'article']):
+ processElement(element)
+
+ if currentChunk:
+ chunks.append(''.join(currentChunk))
+
+ return chunks
+ except Exception:
+ return [content]
+
+ def _chunkImage(self, content: str) -> List[str]:
+ """Chunk image content"""
+ try:
+ imageData = base64.b64decode(content)
+ chunks = []
+ chunkSize = self.chunkSizes["image"]
+
+ for i in range(0, len(imageData), chunkSize):
+ chunk = imageData[i:i + chunkSize]
+ chunks.append(base64.b64encode(chunk).decode('utf-8'))
+
+ return chunks
+ except Exception:
+ return [content]
+
+ def _chunkVideo(self, content: str) -> List[str]:
+ """Chunk video content"""
+ try:
+ videoData = base64.b64decode(content)
+ chunks = []
+ chunkSize = self.chunkSizes["video"]
+
+ for i in range(0, len(videoData), chunkSize):
+ chunk = videoData[i:i + chunkSize]
+ chunks.append(base64.b64encode(chunk).decode('utf-8'))
+
+ return chunks
+ except Exception:
+ return [content]
+
+ def _chunkBinary(self, content: str) -> List[str]:
+ """Chunk binary content"""
+ try:
+ binaryData = base64.b64decode(content)
+ chunks = []
+ chunkSize = self.chunkSizes["binary"]
+
+ for i in range(0, len(binaryData), chunkSize):
+ chunk = binaryData[i:i + chunkSize]
+ chunks.append(base64.b64encode(chunk).decode('utf-8'))
+
+ return chunks
+ except Exception:
+ return [content]
+
async def _extractText(self, content: bytes, mimeType: str) -> str:
"""Extract text content from various text formats"""
try: