gateway/modules/workflow/processorDocument.py
2025-07-10 20:03:50 +02:00

842 lines
No EOL
34 KiB
Python

from typing import Dict, Any, List, Optional, Union, Tuple, TypedDict, Callable, Awaitable
import logging
import json
import os
import io
import base64
from datetime import datetime, UTC
from pathlib import Path
import xml.etree.ElementTree as ET
from bs4 import BeautifulSoup
import uuid
from modules.interfaces.interfaceChatModel import (
ExtractedContent,
ContentItem,
ContentMetadata
)
from modules.neutralizer.neutralizer import DataAnonymizer
from modules.shared.configuration import APP_CONFIG
logger = logging.getLogger(__name__)
# Optional imports - only loaded when needed
pdfExtractorLoaded = False
officeExtractorLoaded = False
imageProcessorLoaded = False
class FileProcessingError(Exception):
"""Custom exception for file processing errors."""
pass
class DocumentProcessor:
"""Processor for handling document operations and content extraction."""
def __init__(self, serviceCenter=None):
"""Initialize the document processor."""
self._neutralizer = DataAnonymizer() if APP_CONFIG.get("ENABLE_CONTENT_NEUTRALIZATION", False) else None
self._serviceCenter = serviceCenter
self.supportedTypes: Dict[str, Callable[[bytes, str, str], Awaitable[List[ContentItem]]]] = {
'text/plain': self._processText,
'text/csv': self._processCsv,
'application/json': self._processJson,
'application/xml': self._processXml,
'text/html': self._processHtml,
'image/svg+xml': self._processSvg,
'image/jpeg': self._processImage,
'image/png': self._processImage,
'image/gif': self._processImage,
'application/pdf': self._processPdf,
'application/vnd.openxmlformats-officedocument.wordprocessingml.document': self._processDocx,
'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet': self._processXlsx
}
self.chunkSizes = {
"text": 40000, # General text content
"plain": 40000, # Plain text
"csv": 40000, # CSV data
"json": 40000, # JSON data
"xml": 40000, # XML data
"html": 40000, # HTML content
"image": 1024 * 1024, # 1MB for images
"video": 5 * 1024 * 1024, # 5MB for video chunks
"binary": 1024 * 1024, # 1MB for binary data
"pdf": 40000, # PDF text content
"docx": 40000, # Word document text
"xlsx": 40000, # Excel data
"svg": 40000 # SVG content
}
def initialize(self) -> None:
"""Initialize the document processor."""
pass
def _loadPdfExtractor(self):
"""Loads PDF extraction libraries when needed"""
global pdfExtractorLoaded
if not pdfExtractorLoaded:
try:
global PyPDF2, fitz
import PyPDF2
import fitz # PyMuPDF for more extensive PDF processing
pdfExtractorLoaded = True
logger.info("PDF extraction libraries successfully loaded")
except ImportError as e:
logger.warning(f"PDF extraction libraries could not be loaded: {e}")
def _loadOfficeExtractor(self):
"""Loads Office document extraction libraries when needed"""
global officeExtractorLoaded
if not officeExtractorLoaded:
try:
global docx, openpyxl
import docx # python-docx for Word documents
import openpyxl # for Excel files
officeExtractorLoaded = True
logger.info("Office extraction libraries successfully loaded")
except ImportError as e:
logger.warning(f"Office extraction libraries could not be loaded: {e}")
def _loadImageProcessor(self):
"""Loads image processing libraries when needed"""
global imageProcessorLoaded
if not imageProcessorLoaded:
try:
global PIL, Image
from PIL import Image
imageProcessorLoaded = True
logger.info("Image processing libraries successfully loaded")
except ImportError as e:
logger.warning(f"Image processing libraries could not be loaded: {e}")
async def processFileData(self, fileData: bytes, filename: str, mimeType: str, base64Encoded: bool = False, prompt: str = None, documentId: str = None) -> ExtractedContent:
"""
Process file data directly and extract its contents with AI processing.
Args:
fileData: Raw file data as bytes
filename: Name of the file
mimeType: MIME type of the file
base64Encoded: Whether the data is base64 encoded
prompt: Prompt for AI content extraction
Returns:
ExtractedContent containing the processed content
Raises:
FileProcessingError: If document processing fails
"""
try:
# Decode base64 if needed
if base64Encoded:
fileData = base64.b64decode(fileData)
# Detect content type if needed
if mimeType == "application/octet-stream":
mimeType = self._serviceCenter.detectContentTypeFromData(fileData, filename)
# Process document based on type
if mimeType not in self.supportedTypes:
# Fallback to binary processing
contentItems = await self._processBinary(fileData, filename, mimeType)
else:
# Process document based on type
processor = self.supportedTypes[mimeType]
contentItems = await processor(fileData, filename, mimeType)
# Process with AI if prompt provided
if prompt and contentItems:
try:
# Process each content item with AI
processedItems = await self._aiDataExtraction(contentItems, prompt)
contentItems = processedItems
except Exception as e:
logger.error(f"Error processing content with AI: {str(e)}")
return ExtractedContent(
id=documentId if documentId else str(uuid.uuid4()),
contents=contentItems
)
except Exception as e:
logger.error(f"Error processing file data: {str(e)}")
raise FileProcessingError(f"Failed to process file data: {str(e)}")
async def _processText(self, fileData: bytes, filename: str, mimeType: str) -> List[ContentItem]:
"""Process text document"""
try:
content = fileData.decode('utf-8')
return [ContentItem(
label="main",
data=content,
metadata=ContentMetadata(
size=len(content.encode('utf-8')),
pages=1,
mimeType="text/plain",
base64Encoded=False
)
)]
except Exception as e:
logger.error(f"Error processing text document: {str(e)}")
raise FileProcessingError(f"Failed to process text document: {str(e)}")
async def _processCsv(self, fileData: bytes, filename: str, mimeType: str) -> List[ContentItem]:
"""Process CSV document"""
try:
content = fileData.decode('utf-8')
return [ContentItem(
label="main",
data=content,
metadata=ContentMetadata(
size=len(content.encode('utf-8')),
pages=1,
mimeType="text/csv",
base64Encoded=False
)
)]
except Exception as e:
logger.error(f"Error processing CSV document: {str(e)}")
raise FileProcessingError(f"Failed to process CSV document: {str(e)}")
async def _processJson(self, fileData: bytes, filename: str, mimeType: str) -> List[ContentItem]:
"""Process JSON document"""
try:
content = fileData.decode('utf-8')
# Parse JSON to validate
jsonData = json.loads(content)
return [ContentItem(
label="main",
data=content,
metadata=ContentMetadata(
size=len(content.encode('utf-8')),
pages=1,
mimeType="application/json",
base64Encoded=False
)
)]
except Exception as e:
logger.error(f"Error processing JSON document: {str(e)}")
raise FileProcessingError(f"Failed to process JSON document: {str(e)}")
async def _processXml(self, fileData: bytes, filename: str, mimeType: str) -> List[ContentItem]:
"""Process XML document"""
try:
content = fileData.decode('utf-8')
return [ContentItem(
label="main",
data=content,
metadata=ContentMetadata(
size=len(content.encode('utf-8')),
pages=1,
mimeType="application/xml",
base64Encoded=False
)
)]
except Exception as e:
logger.error(f"Error processing XML document: {str(e)}")
raise FileProcessingError(f"Failed to process XML document: {str(e)}")
async def _processHtml(self, fileData: bytes, filename: str, mimeType: str) -> List[ContentItem]:
"""Process HTML document"""
try:
content = fileData.decode('utf-8')
return [ContentItem(
label="main",
data=content,
metadata=ContentMetadata(
size=len(content.encode('utf-8')),
pages=1,
mimeType="text/html",
base64Encoded=False
)
)]
except Exception as e:
logger.error(f"Error processing HTML document: {str(e)}")
raise FileProcessingError(f"Failed to process HTML document: {str(e)}")
async def _processSvg(self, fileData: bytes, filename: str, mimeType: str) -> List[ContentItem]:
"""Process SVG document"""
try:
content = fileData.decode('utf-8')
# Check if it's actually SVG
isSvg = "<svg" in content.lower()
return [ContentItem(
label="main",
data=content if isSvg else None,
metadata=ContentMetadata(
size=len(content.encode('utf-8')),
mimeType="image/svg+xml",
base64Encoded=False,
error=None if isSvg else "Invalid SVG content"
)
)]
except Exception as e:
logger.error(f"Error processing SVG document: {str(e)}")
raise FileProcessingError(f"Failed to process SVG document: {str(e)}")
async def _processImage(self, fileData: bytes, filename: str, mimeType: str) -> List[ContentItem]:
"""Process image document"""
try:
self._loadImageProcessor()
if not imageProcessorLoaded:
raise FileProcessingError("Image processing libraries not available")
with io.BytesIO(fileData) as imgStream:
img = Image.open(imgStream)
metadata = ContentMetadata(
size=len(fileData),
width=img.width,
height=img.height,
colorMode=img.mode,
mimeType=mimeType,
base64Encoded=True
)
# Convert image to base64 for storage
imgStream.seek(0)
imgData = base64.b64encode(imgStream.read()).decode('utf-8')
return [ContentItem(
label="image",
data=imgData,
metadata=metadata
)]
except Exception as e:
logger.error(f"Error processing image document: {str(e)}")
raise FileProcessingError(f"Failed to process image document: {str(e)}")
async def _processPdf(self, fileData: bytes, filename: str, mimeType: str) -> List[ContentItem]:
"""Process PDF document"""
try:
self._loadPdfExtractor()
if not pdfExtractorLoaded:
raise FileProcessingError("PDF extraction libraries not available")
contentItems = []
with io.BytesIO(fileData) as pdfStream:
# Extract text with PyPDF2
pdfReader = PyPDF2.PdfReader(pdfStream)
metadata = ContentMetadata(
size=len(fileData),
pages=len(pdfReader.pages),
mimeType="application/pdf",
base64Encoded=False
)
# Extract text from all pages
for pageNum in range(len(pdfReader.pages)):
page = pdfReader.pages[pageNum]
pageText = page.extract_text()
if pageText:
contentItems.append(ContentItem(
label=f"page_{pageNum + 1}",
data=pageText,
metadata=ContentMetadata(
size=len(pageText.encode('utf-8')),
pages=1,
mimeType="text/plain",
base64Encoded=False
)
))
# Extract images with PyMuPDF
pdfStream.seek(0)
doc = fitz.open(stream=pdfStream, filetype="pdf")
for pageNum in range(len(doc)):
page = doc[pageNum]
for imgIndex, imgInfo in enumerate(page.get_images(full=True)):
try:
xref = imgInfo[0]
baseImage = doc.extract_image(xref)
if baseImage:
imageBytes = baseImage.get("image", b"")
imageExt = baseImage.get("ext", "png")
if imageBytes:
contentItems.append(ContentItem(
label=f"image_{pageNum + 1}_{imgIndex}",
data=base64.b64encode(imageBytes).decode('utf-8'),
metadata=ContentMetadata(
size=len(imageBytes),
pages=1,
mimeType=f"image/{imageExt}",
base64Encoded=True
)
))
except Exception as imgE:
logger.warning(f"Error extracting image {imgIndex} on page {pageNum + 1}: {str(imgE)}")
doc.close()
return contentItems
except Exception as e:
logger.error(f"Error processing PDF document: {str(e)}")
raise FileProcessingError(f"Failed to process PDF document: {str(e)}")
async def _processDocx(self, fileData: bytes, filename: str, mimeType: str) -> List[ContentItem]:
"""Process Word document"""
try:
self._loadOfficeExtractor()
if not officeExtractorLoaded:
raise FileProcessingError("Office extraction libraries not available")
with io.BytesIO(fileData) as docxStream:
doc = docx.Document(docxStream)
# Extract text
fullText = []
for para in doc.paragraphs:
fullText.append(para.text)
# Extract tables
for table in doc.tables:
for row in table.rows:
rowText = []
for cell in row.cells:
rowText.append(cell.text)
fullText.append(" | ".join(rowText))
content = "\n".join(fullText)
return [ContentItem(
label="main",
data=content,
metadata=ContentMetadata(
size=len(content.encode('utf-8')),
pages=len(doc.paragraphs),
mimeType="text/plain",
base64Encoded=False
)
)]
except Exception as e:
logger.error(f"Error processing Word document: {str(e)}")
raise FileProcessingError(f"Failed to process Word document: {str(e)}")
async def _processXlsx(self, fileData: bytes, filename: str, mimeType: str) -> List[ContentItem]:
"""Process Excel document"""
try:
self._loadOfficeExtractor()
if not officeExtractorLoaded:
raise FileProcessingError("Office extraction libraries not available")
contentItems = []
with io.BytesIO(fileData) as xlsxStream:
workbook = openpyxl.load_workbook(xlsxStream, data_only=True)
for sheetName in workbook.sheetnames:
sheet = workbook[sheetName]
csvRows = []
for row in sheet.iter_rows():
csvRow = []
for cell in row:
value = cell.value
if value is None:
csvRow.append("")
else:
csvRow.append(str(value).replace('"', '""'))
csvRows.append(','.join(f'"{cell}"' for cell in csvRow))
content = "\n".join(csvRows)
contentItems.append(ContentItem(
label=sheetName,
data=content,
metadata=ContentMetadata(
size=len(content.encode('utf-8')),
pages=1,
mimeType="text/csv",
base64Encoded=False
)
))
return contentItems
except Exception as e:
logger.error(f"Error processing Excel document: {str(e)}")
raise FileProcessingError(f"Failed to process Excel document: {str(e)}")
async def _processBinary(self, fileData: bytes, filename: str, mimeType: str) -> List[ContentItem]:
"""Process binary document"""
try:
return [ContentItem(
label="binary",
data=base64.b64encode(fileData).decode('utf-8'),
metadata=ContentMetadata(
size=len(fileData),
mimeType=mimeType,
base64Encoded=True,
error="Unsupported file type"
)
)]
except Exception as e:
logger.error(f"Error processing binary document: {str(e)}")
raise FileProcessingError(f"Failed to process binary document: {str(e)}")
async def _aiDataExtraction(self, contentItems: List[ContentItem], prompt: str) -> List[ContentItem]:
"""
Process content items with AI, handling chunking based on content type.
Args:
contentItems: List of content items to process
prompt: Prompt for AI content extraction
Returns:
List of processed content items
"""
processedItems = []
for item in contentItems:
try:
# Get content type from metadata
mimeType = item.metadata.mimeType if hasattr(item.metadata, 'mimeType') else "text/plain"
logger.debug(f"Processing content item with MIME type: {mimeType}, label: {item.label}")
# Chunk content based on type
if mimeType.startswith('text/'):
chunks = self._chunkText(item.data, mimeType)
elif mimeType.startswith('image/'):
# Images should not be chunked - process as single unit
chunks = [item.data]
elif mimeType == "application/pdf":
chunks = self._chunkPdf(item.data)
elif mimeType == "application/vnd.openxmlformats-officedocument.wordprocessingml.document":
chunks = self._chunkDocx(item.data)
elif mimeType == "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet":
chunks = self._chunkXlsx(item.data)
elif mimeType.startswith('application/vnd.openxmlformats-officedocument.presentationml.presentation'):
chunks = self._chunkPptx(item.data)
else:
# Binary data - no chunking
chunks = [item.data]
# Process each chunk
chunkResults = []
for chunk in chunks:
# Process with AI based on content type
try:
logger.debug(f"AI processing chunk with MIME type: {mimeType}")
if mimeType.startswith('image/'):
# For images, use image AI service with base64 data
# chunk is already base64 encoded string from _processImage
# Use the original prompt directly for images (no content embedding)
logger.debug(f"Calling image AI service for MIME type: {mimeType}")
processedContent = await self._serviceCenter.callAiImageBasic(prompt, chunk, mimeType)
else:
# For text content, use text AI service
# Neutralize content if neutralizer is enabled (only for text)
contentToProcess = chunk
if self._neutralizer and contentToProcess:
contentToProcess = self._neutralizer.neutralize(contentToProcess)
# Create AI prompt for text content
aiPrompt = f"""
Extract relevant information from this content based on the following prompt:
PROMPT: {prompt}
CONTENT:
{contentToProcess}
Return ONLY the extracted information in a clear, concise format.
"""
logger.debug(f"Calling text AI service for MIME type: {mimeType}")
processedContent = await self._serviceCenter.callAiTextBasic(aiPrompt, contentToProcess)
chunkResults.append(processedContent)
except Exception as aiError:
logger.error(f"AI processing failed for chunk: {str(aiError)}")
# Fallback to original content
chunkResults.append(chunk)
# Combine chunk results
combinedResult = "\n".join(chunkResults)
# Update content with AI processed data
processedItems.append(ContentItem(
label=item.label,
data=combinedResult,
metadata=ContentMetadata(
size=len(combinedResult.encode('utf-8')),
pages=1,
mimeType="text/plain",
base64Encoded=False
)
))
except Exception as e:
logger.error(f"Error processing content chunk: {str(e)}")
# Add original content if processing fails
processedItems.append(item)
return processedItems
def _chunkText(self, content: str, mimeType: str) -> List[str]:
"""Chunk text content based on mime type"""
if mimeType == "text/plain":
return self._chunkPlainText(content)
elif mimeType == "text/csv":
return self._chunkCsv(content)
elif mimeType == "application/json":
return self._chunkJson(content)
elif mimeType == "application/xml":
return self._chunkXml(content)
elif mimeType == "text/html":
return self._chunkHtml(content)
else:
return self._chunkPlainText(content)
def _chunkPlainText(self, content: str) -> List[str]:
"""Chunk plain text content"""
chunks = []
currentChunk = []
currentSize = 0
for line in content.split('\n'):
lineSize = len(line.encode('utf-8'))
if currentSize + lineSize > self.chunkSizes["plain"]:
if currentChunk:
chunks.append('\n'.join(currentChunk))
currentChunk = [line]
currentSize = lineSize
else:
currentChunk.append(line)
currentSize += lineSize
if currentChunk:
chunks.append('\n'.join(currentChunk))
return chunks
def _chunkCsv(self, content: str) -> List[str]:
"""Chunk CSV content"""
chunks = []
currentChunk = []
currentSize = 0
for line in content.split('\n'):
lineSize = len(line.encode('utf-8'))
if currentSize + lineSize > self.chunkSizes["csv"]:
if currentChunk:
chunks.append('\n'.join(currentChunk))
currentChunk = [line]
currentSize = lineSize
else:
currentChunk.append(line)
currentSize += lineSize
if currentChunk:
chunks.append('\n'.join(currentChunk))
return chunks
def _chunkJson(self, content: str) -> List[str]:
"""Chunk JSON content"""
try:
data = json.loads(content)
chunks = []
currentChunk = []
currentSize = 0
def processValue(value, path=""):
nonlocal currentChunk, currentSize
valueStr = json.dumps({path: value}) if path else json.dumps(value)
valueSize = len(valueStr.encode('utf-8'))
if currentSize + valueSize > self.chunkSizes["json"]:
if currentChunk:
chunks.append(json.dumps(currentChunk))
currentChunk = [value]
currentSize = valueSize
else:
currentChunk.append(value)
currentSize += valueSize
if isinstance(data, list):
for i, item in enumerate(data):
processValue(item, str(i))
elif isinstance(data, dict):
for key, value in data.items():
processValue(value, key)
else:
processValue(data)
if currentChunk:
chunks.append(json.dumps(currentChunk))
return chunks
except json.JSONDecodeError:
return [content]
def _chunkXml(self, content: str) -> List[str]:
"""Chunk XML content"""
try:
root = ET.fromstring(content)
chunks = []
currentChunk = []
currentSize = 0
def processElement(element, path=""):
nonlocal currentChunk, currentSize
elementStr = ET.tostring(element, encoding='unicode')
elementSize = len(elementStr.encode('utf-8'))
if currentSize + elementSize > self.chunkSizes["xml"]:
if currentChunk:
chunks.append(''.join(currentChunk))
currentChunk = [elementStr]
currentSize = elementSize
else:
currentChunk.append(elementStr)
currentSize += elementSize
for child in root:
processElement(child)
if currentChunk:
chunks.append(''.join(currentChunk))
return chunks
except ET.ParseError:
return [content]
def _chunkHtml(self, content: str) -> List[str]:
"""Chunk HTML content"""
try:
soup = BeautifulSoup(content, 'html.parser')
chunks = []
currentChunk = []
currentSize = 0
def processElement(element):
nonlocal currentChunk, currentSize
elementStr = str(element)
elementSize = len(elementStr.encode('utf-8'))
if currentSize + elementSize > self.chunkSizes["html"]:
if currentChunk:
chunks.append(''.join(currentChunk))
currentChunk = [elementStr]
currentSize = elementSize
else:
currentChunk.append(elementStr)
currentSize += elementSize
for element in soup.find_all(['p', 'div', 'section', 'article']):
processElement(element)
if currentChunk:
chunks.append(''.join(currentChunk))
return chunks
except Exception:
return [content]
def _chunkBinary(self, content: str) -> List[str]:
"""Chunk binary content"""
try:
binaryData = base64.b64decode(content)
chunks = []
chunkSize = self.chunkSizes["binary"]
for i in range(0, len(binaryData), chunkSize):
chunk = binaryData[i:i + chunkSize]
chunks.append(base64.b64encode(chunk).decode('utf-8'))
return chunks
except Exception:
return [content]
async def _chunkPdf(self, content: str) -> List[str]:
"""Chunk PDF content"""
try:
pdfData = base64.b64decode(content)
chunks = []
chunkSize = self.chunkSizes["pdf"]
with io.BytesIO(pdfData) as pdfStream:
pdfReader = PyPDF2.PdfReader(pdfStream)
for pageNum in range(len(pdfReader.pages)):
page = pdfReader.pages[pageNum]
pageText = page.extract_text()
if pageText:
chunks.append(pageText)
return chunks
except Exception:
return [content]
async def _chunkDocx(self, content: str) -> List[str]:
"""Chunk Word document content"""
try:
docxData = base64.b64decode(content)
chunks = []
chunkSize = self.chunkSizes["docx"]
with io.BytesIO(docxData) as docxStream:
doc = docx.Document(docxStream)
for para in doc.paragraphs:
chunks.append(para.text)
for table in doc.tables:
for row in table.rows:
rowText = []
for cell in row.cells:
rowText.append(cell.text)
chunks.append(" | ".join(rowText))
return chunks
except Exception:
return [content]
async def _chunkXlsx(self, content: str) -> List[str]:
"""Chunk Excel document content"""
try:
xlsxData = base64.b64decode(content)
chunks = []
chunkSize = self.chunkSizes["xlsx"]
with io.BytesIO(xlsxData) as xlsxStream:
workbook = openpyxl.load_workbook(xlsxStream, data_only=True)
for sheetName in workbook.sheetnames:
sheet = workbook[sheetName]
for row in sheet.iter_rows():
rowText = []
for cell in row:
value = cell.value
if value is None:
rowText.append("")
else:
rowText.append(str(value).replace('"', '""'))
chunks.append(','.join(f'"{cell}"' for cell in rowText))
return chunks
except Exception:
return [content]
async def _chunkPptx(self, content: str) -> List[str]:
"""Chunk PowerPoint document content"""
try:
pptxData = base64.b64decode(content)
chunks = []
chunkSize = self.chunkSizes["pptx"]
with io.BytesIO(pptxData) as pptxStream:
# openpyxl is not suitable for PowerPoint, so we'll just read text
# This is a placeholder and would require a different library for full pptx processing
# For now, we'll just return the base64 encoded content as a single chunk
chunks.append(content)
return chunks
except Exception:
return [content]