from typing import Dict, Any, List, Optional, Union, Tuple, TypedDict, Callable, Awaitable import logging import json import os import io import base64 from datetime import datetime, UTC from pathlib import Path import xml.etree.ElementTree as ET from bs4 import BeautifulSoup import uuid from .documentUtility import ( getFileExtension, getMimeTypeFromExtension, detectMimeTypeFromContent, detectMimeTypeFromData, convertDocumentDataToString ) from modules.interfaces.interfaceChatModel import ( ExtractedContent, ContentItem, ContentMetadata ) from modules.neutralizer.neutralizer import DataAnonymizer from modules.shared.configuration import APP_CONFIG logger = logging.getLogger(__name__) # Optional imports - only loaded when needed pdfExtractorLoaded = False officeExtractorLoaded = False imageProcessorLoaded = False class FileProcessingError(Exception): """Custom exception for file processing errors.""" pass class DocumentExtraction: """Processor for handling document operations and content extraction.""" def __init__(self, serviceCenter=None): """Initialize the document processor.""" self._neutralizer = DataAnonymizer() if APP_CONFIG.get("ENABLE_CONTENT_NEUTRALIZATION", False) else None self._serviceCenter = serviceCenter self.supportedTypes: Dict[str, Callable[[bytes, str, str], Awaitable[List[ContentItem]]]] = { # Text and data files 'text/plain': self._processText, 'text/csv': self._processCsv, 'application/json': self._processJson, 'application/xml': self._processXml, 'text/html': self._processHtml, 'image/svg+xml': self._processSvg, # Programming languages 'application/javascript': self._processText, 'application/typescript': self._processText, 'text/jsx': self._processText, 'text/tsx': self._processText, 'text/x-python': self._processText, 'text/x-java-source': self._processText, 'text/x-c': self._processText, 'text/x-c++src': self._processText, 'text/x-c++hdr': self._processText, 'text/x-csharp': self._processText, 'application/x-httpd-php': self._processText, 'text/x-ruby': self._processText, 'text/x-go': self._processText, 'text/x-rust': self._processText, 'text/x-swift': self._processText, 'text/x-kotlin': self._processText, 'text/x-scala': self._processText, 'text/x-r': self._processText, 'text/x-matlab': self._processText, 'text/x-perl': self._processText, 'application/x-sh': self._processText, 'application/x-powershell': self._processText, 'application/x-msdos-program': self._processText, 'text/vbscript': self._processText, 'text/x-lua': self._processText, 'application/sql': self._processText, 'application/dart': self._processText, 'text/x-elm': self._processText, 'text/x-clojure': self._processText, 'text/x-haskell': self._processText, 'text/x-fsharp': self._processText, 'text/x-ocaml': self._processText, # Web technologies 'text/css': self._processText, 'text/x-scss': self._processText, 'text/x-sass': self._processText, 'text/x-less': self._processText, 'text/x-vue': self._processText, 'text/x-svelte': self._processText, 'text/x-astro': self._processText, # Configuration and build files 'application/x-yaml': self._processText, 'application/toml': self._processText, 'text/x-dockerfile': self._processText, 'text/x-makefile': self._processText, 'text/x-cmake': self._processText, 'text/x-gradle': self._processText, 'text/x-maven': self._processText, # Documentation and markup 'text/markdown': self._processText, 'text/x-rst': self._processText, 'application/x-tex': self._processText, 'text/x-bibtex': self._processText, 'text/asciidoc': self._processText, 'text/x-wiki': self._processText, # Images 'image/jpeg': self._processImage, 'image/png': self._processImage, 'image/gif': self._processImage, 'image/webp': self._processImage, 'image/bmp': self._processImage, 'image/tiff': self._processImage, 'image/x-icon': self._processImage, # Documents 'application/pdf': self._processPdf, 'application/vnd.openxmlformats-officedocument.wordprocessingml.document': self._processDocx, 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet': self._processXlsx, 'application/vnd.openxmlformats-officedocument.presentationml.presentation': self._processPptx, 'application/vnd.oasis.opendocument.text': self._processText, 'application/vnd.oasis.opendocument.spreadsheet': self._processText, 'application/vnd.oasis.opendocument.presentation': self._processText, # Legacy Office formats 'application/msword': self._processLegacyDoc, 'application/vnd.ms-excel': self._processLegacyXls, 'application/vnd.ms-powerpoint': self._processLegacyPpt } self.chunkSizes = { "text": 40000, # General text content "plain": 40000, # Plain text "csv": 40000, # CSV data "json": 40000, # JSON data "xml": 40000, # XML data "html": 40000, # HTML content "markdown": 40000, # Markdown content "code": 80000, # Programming code (increased for better preservation) "script": 80000, # Script files (increased for better preservation) "javascript": 80000, # JavaScript files specifically "typescript": 80000, # TypeScript files specifically "config": 40000, # Configuration files "image": 1024 * 1024, # 1MB for images "video": 5 * 1024 * 1024, # 5MB for video chunks "binary": 1024 * 1024, # 1MB for binary data "pdf": 40000, # PDF text content "docx": 40000, # Word document text "xlsx": 40000, # Excel data "svg": 40000 # SVG content } def _robustTextDecode(self, fileData: bytes, filename: str = "unknown") -> str: """ Robustly decode text data with multiple encoding fallbacks. Args: fileData: Raw bytes to decode filename: Filename for logging purposes Returns: Decoded text string Raises: FileProcessingError: If all decoding attempts fail """ # Try multiple encoding options in order of likelihood encodings_to_try = ['utf-8', 'windows-1252', 'iso-8859-1', 'latin-1', 'cp1252'] content = None # First try UTF-8 (most common) try: content = fileData.decode('utf-8') return content except UnicodeDecodeError: pass # Try other encodings for encoding in encodings_to_try[1:]: try: content = fileData.decode(encoding) return content except UnicodeDecodeError: continue # If all encodings fail, try with error handling try: # Try with chardet for automatic detection import chardet detected = chardet.detect(fileData) if detected['confidence'] > 0.7: detected_encoding = detected['encoding'] content = fileData.decode(detected_encoding, errors='replace') return content else: # Last resort: decode with replacement characters content = fileData.decode('utf-8', errors='replace') logger.warning(f"{filename}: decoded with UTF-8 and replacement characters due to low encoding confidence") return content except ImportError: # chardet not available, use replacement characters content = fileData.decode('utf-8', errors='replace') logger.warning(f"{filename}: decoded with UTF-8 and replacement characters (chardet not available)") return content # This should never be reached, but just in case raise FileProcessingError(f"Failed to decode {filename} with any encoding") def initialize(self) -> None: """Initialize the document processor.""" pass def _loadPdfExtractor(self): """Loads PDF extraction libraries when needed""" global pdfExtractorLoaded if not pdfExtractorLoaded: try: global PyPDF2, fitz import PyPDF2 import fitz # PyMuPDF for more extensive PDF processing pdfExtractorLoaded = True logger.debug("PDF extraction libraries successfully loaded") except ImportError as e: logger.warning(f"PDF extraction libraries could not be loaded: {e}") def _loadOfficeExtractor(self): """Loads Office document extraction libraries when needed""" global officeExtractorLoaded if not officeExtractorLoaded: try: global docx, openpyxl import docx # python-docx for Word documents import openpyxl # for Excel files officeExtractorLoaded = True logger.debug("Office extraction libraries successfully loaded") except ImportError as e: logger.warning(f"Office extraction libraries could not be loaded: {e}") def _loadImageProcessor(self): """Loads image processing libraries when needed""" global imageProcessorLoaded if not imageProcessorLoaded: try: global PIL, Image from PIL import Image imageProcessorLoaded = True logger.debug("Image processing libraries successfully loaded") except ImportError as e: logger.warning(f"Image processing libraries could not be loaded: {e}") async def processFileData(self, fileData: bytes, filename: str, mimeType: str, base64Encoded: bool = False, prompt: str = None, documentId: str = None, enableAI: bool = True) -> ExtractedContent: """ Process file data directly and extract its contents with optional AI processing. Args: fileData: Raw file data as bytes filename: Name of the file mimeType: MIME type of the file base64Encoded: Whether the data is base64 encoded prompt: Prompt for AI content extraction documentId: Optional document ID enableAI: Whether to enable AI processing (default: True) Returns: ExtractedContent containing the processed content Raises: FileProcessingError: If document processing fails """ try: # Decode base64 if needed if base64Encoded: fileData = base64.b64decode(fileData) # Use documentUtility for mime type detection if mimeType == "application/octet-stream": mimeType = detectMimeTypeFromData(fileData, filename, self._serviceCenter) # Process document based on type if mimeType not in self.supportedTypes: contentItems = await self._processBinary(fileData, filename, mimeType) else: processor = self.supportedTypes[mimeType] contentItems = await processor(fileData, filename, mimeType) # Process with AI if prompt provided and AI is enabled if enableAI and prompt and contentItems: try: # Process each content item with AI processedItems = await self._aiDataExtraction(contentItems, prompt) contentItems = processedItems except Exception as e: logger.error(f"Error processing content with AI: {str(e)}") elif not enableAI: logger.debug(f"AI processing disabled for {filename}, returning raw extracted content") return ExtractedContent( id=documentId if documentId else str(uuid.uuid4()), contents=contentItems ) except Exception as e: logger.error(f"Error processing file data: {str(e)}") raise FileProcessingError(f"Failed to process file data: {str(e)}") async def _processText(self, fileData: bytes, filename: str, mimeType: str) -> List[ContentItem]: """Process text document with robust encoding detection and complete content extraction""" try: content = self._robustTextDecode(fileData, filename) # Validate that we got the complete content if not content or len(content.strip()) == 0: logger.warning(f"Empty content extracted from {filename}") return [ContentItem( label="empty", data="[Empty file or no readable content]", metadata=ContentMetadata( size=0, pages=1, mimeType="text/plain", base64Encoded=False ) )] # Log content size for debugging content_size = len(content.encode('utf-8')) # Use documentUtility for mime type mime_type = getMimeTypeFromExtension(getFileExtension(filename), self._serviceCenter) return [ContentItem( label="main", data=content, metadata=ContentMetadata( size=content_size, pages=1, mimeType=mime_type, base64Encoded=False ) )] except Exception as e: logger.error(f"Error processing text document: {str(e)}") raise FileProcessingError(f"Failed to process text document: {str(e)}") async def _processCsv(self, fileData: bytes, filename: str, mimeType: str) -> List[ContentItem]: """Process CSV document with robust encoding detection""" try: content = self._robustTextDecode(fileData, filename) mime_type = getMimeTypeFromExtension(getFileExtension(filename), self._serviceCenter) return [ContentItem( label="main", data=content, metadata=ContentMetadata( size=len(content.encode('utf-8')), pages=1, mimeType=mime_type, base64Encoded=False ) )] except Exception as e: logger.error(f"Error processing CSV document: {str(e)}") raise FileProcessingError(f"Failed to process CSV document: {str(e)}") async def _processJson(self, fileData: bytes, filename: str, mimeType: str) -> List[ContentItem]: """Process JSON document with robust encoding detection""" try: content = self._robustTextDecode(fileData, filename) jsonData = json.loads(content) mime_type = getMimeTypeFromExtension(getFileExtension(filename), self._serviceCenter) return [ContentItem( label="main", data=content, metadata=ContentMetadata( size=len(content.encode('utf-8')), pages=1, mimeType=mime_type, base64Encoded=False ) )] except Exception as e: logger.error(f"Error processing JSON document: {str(e)}") raise FileProcessingError(f"Failed to process JSON document: {str(e)}") async def _processXml(self, fileData: bytes, filename: str, mimeType: str) -> List[ContentItem]: """Process XML document with robust encoding detection""" try: content = self._robustTextDecode(fileData, filename) mime_type = getMimeTypeFromExtension(getFileExtension(filename), self._serviceCenter) return [ContentItem( label="main", data=content, metadata=ContentMetadata( size=len(content.encode('utf-8')), pages=1, mimeType=mime_type, base64Encoded=False ) )] except Exception as e: logger.error(f"Error processing XML document: {str(e)}") raise FileProcessingError(f"Failed to process XML document: {str(e)}") async def _processHtml(self, fileData: bytes, filename: str, mimeType: str) -> List[ContentItem]: """Process HTML document with robust encoding detection""" try: content = self._robustTextDecode(fileData, filename) mime_type = getMimeTypeFromExtension(getFileExtension(filename), self._serviceCenter) return [ContentItem( label="main", data=content, metadata=ContentMetadata( size=len(content.encode('utf-8')), pages=1, mimeType=mime_type, base64Encoded=False ) )] except Exception as e: logger.error(f"Error processing HTML document: {str(e)}") raise FileProcessingError(f"Failed to process HTML document: {str(e)}") async def _processSvg(self, fileData: bytes, filename: str, mimeType: str) -> List[ContentItem]: """Process SVG document with robust encoding detection and meaningful content extraction""" try: content = self._robustTextDecode(fileData, filename) # Check if it's actually SVG content if " List[ContentItem]: """Process image document""" try: self._loadImageProcessor() if not imageProcessorLoaded: raise FileProcessingError("Image processing libraries not available") with io.BytesIO(fileData) as imgStream: img = Image.open(imgStream) # For GIF files, provide descriptive information instead of AI processing if mimeType == "image/gif": try: frame_count = getattr(img, 'n_frames', 1) duration = getattr(img, 'duration', 0) # Create a descriptive text about the GIF gif_description = f"GIF Image Analysis:\n" gif_description += f"- Dimensions: {img.width} x {img.height} pixels\n" gif_description += f"- Frame count: {frame_count}\n" gif_description += f"- Color mode: {img.mode}\n" if duration > 0: gif_description += f"- Duration: {duration}ms\n" gif_description += f"- File size: {len(fileData)} bytes\n" gif_description += f"- Format: {img.format}\n\n" gif_description += f"Note: This is an animated GIF image. The AI cannot directly analyze image content, but the file contains {frame_count} frame(s) of animation." return [ContentItem( label="gif_analysis", data=gif_description, metadata=ContentMetadata( size=len(gif_description.encode('utf-8')), width=img.width, height=img.height, colorMode=img.mode, mimeType="text/plain", base64Encoded=False ) )] except Exception as gifError: logger.warning(f"GIF processing failed: {str(gifError)}") # Fallback to basic description pass metadata = ContentMetadata( size=len(fileData), width=img.width, height=img.height, colorMode=img.mode, mimeType=mimeType, base64Encoded=True ) # Convert image to base64 for storage imgStream.seek(0) imgData = base64.b64encode(imgStream.read()).decode('utf-8') return [ContentItem( label="image", data=imgData, metadata=metadata )] except Exception as e: logger.error(f"Error processing image document: {str(e)}") raise FileProcessingError(f"Failed to process image document: {str(e)}") async def _processPdf(self, fileData: bytes, filename: str, mimeType: str) -> List[ContentItem]: """Process PDF document""" try: self._loadPdfExtractor() if not pdfExtractorLoaded: raise FileProcessingError("PDF extraction libraries not available") contentItems = [] with io.BytesIO(fileData) as pdfStream: # Extract text with PyPDF2 pdfReader = PyPDF2.PdfReader(pdfStream) metadata = ContentMetadata( size=len(fileData), pages=len(pdfReader.pages), mimeType="application/pdf", base64Encoded=False ) # Extract text from all pages for pageNum in range(len(pdfReader.pages)): page = pdfReader.pages[pageNum] pageText = page.extract_text() if pageText: contentItems.append(ContentItem( label=f"page_{pageNum + 1}", data=pageText, metadata=ContentMetadata( size=len(pageText.encode('utf-8')), pages=1, mimeType="text/plain", base64Encoded=False ) )) # Extract images with PyMuPDF pdfStream.seek(0) doc = fitz.open(stream=pdfStream, filetype="pdf") for pageNum in range(len(doc)): page = doc[pageNum] for imgIndex, imgInfo in enumerate(page.get_images(full=True)): try: xref = imgInfo[0] baseImage = doc.extract_image(xref) if baseImage: imageBytes = baseImage.get("image", b"") imageExt = baseImage.get("ext", "png") if imageBytes: contentItems.append(ContentItem( label=f"image_{pageNum + 1}_{imgIndex}", data=base64.b64encode(imageBytes).decode('utf-8'), metadata=ContentMetadata( size=len(imageBytes), pages=1, mimeType=f"image/{imageExt}", base64Encoded=True ) )) except Exception as imgE: logger.warning(f"Error extracting image {imgIndex} on page {pageNum + 1}: {str(imgE)}") doc.close() return contentItems except Exception as e: logger.error(f"Error processing PDF document: {str(e)}") raise FileProcessingError(f"Failed to process PDF document: {str(e)}") async def _processDocx(self, fileData: bytes, filename: str, mimeType: str) -> List[ContentItem]: """Process Word document with enhanced formatting preservation""" try: self._loadOfficeExtractor() if not officeExtractorLoaded: raise FileProcessingError("Office extraction libraries not available") contentItems = [] with io.BytesIO(fileData) as docxStream: doc = docx.Document(docxStream) # Extract document properties doc_properties = [] if doc.core_properties.title: doc_properties.append(f"Title: {doc.core_properties.title}") if doc.core_properties.author: doc_properties.append(f"Author: {doc.core_properties.author}") if doc.core_properties.subject: doc_properties.append(f"Subject: {doc.core_properties.subject}") if doc.core_properties.keywords: doc_properties.append(f"Keywords: {doc.core_properties.keywords}") if doc.core_properties.comments: doc_properties.append(f"Comments: {doc.core_properties.comments}") # Extract main content with formatting main_content = [] # Process paragraphs with formatting for para in doc.paragraphs: if para.text.strip(): # Get paragraph style style_name = para.style.name if para.style else "Normal" # Check for heading styles if style_name.startswith('Heading'): level = style_name.replace('Heading ', '') main_content.append(f"\n{'#' * int(level)} {para.text}") else: # Check for bold, italic, underline formatting formatted_text = para.text if para.runs: # Process individual runs for formatting run_texts = [] for run in para.runs: run_text = run.text if run.bold: run_text = f"**{run_text}**" if run.italic: run_text = f"*{run_text}*" if run.underline: run_text = f"__{run_text}__" run_texts.append(run_text) formatted_text = ''.join(run_texts) main_content.append(formatted_text) # Extract tables with better formatting table_count = 0 for table in doc.tables: table_count += 1 main_content.append(f"\n\n--- Table {table_count} ---") # Get table headers (first row) if table.rows: header_row = table.rows[0] headers = [cell.text.strip() for cell in header_row.cells] main_content.append("| " + " | ".join(headers) + " |") main_content.append("|" + "|".join(["---"] * len(headers)) + "|") # Process data rows for row in table.rows[1:]: row_data = [cell.text.strip() for cell in row.cells] main_content.append("| " + " | ".join(row_data) + " |") main_content.append("--- End Table ---\n") # Extract headers and footers if available try: # Check for headers and footers in sections for section in doc.sections: # Header if section.header: header_text = [] for para in section.header.paragraphs: if para.text.strip(): header_text.append(f"[Header] {para.text}") if header_text: main_content.insert(0, "\n".join(header_text) + "\n") # Footer if section.footer: footer_text = [] for para in section.footer.paragraphs: if para.text.strip(): footer_text.append(f"[Footer] {para.text}") if footer_text: main_content.append("\n" + "\n".join(footer_text)) except Exception as header_footer_error: logger.debug(f"Could not extract headers/footers: {header_footer_error}") # Extract comments if available try: comments = [] for comment in doc.part.comments_part.comments if doc.part.comments_part else []: comment_text = comment.text.strip() if comment_text: comments.append(f"[Comment] {comment_text}") if comments: main_content.append("\n\n--- Comments ---") main_content.extend(comments) main_content.append("--- End Comments ---") except Exception as comment_error: logger.debug(f"Could not extract comments: {comment_error}") # Combine all content if doc_properties: main_content.insert(0, "--- Document Properties ---\n" + "\n".join(doc_properties) + "\n--- End Properties ---\n") final_content = "\n".join(main_content) # Create main content item contentItems.append(ContentItem( label="main", data=final_content, metadata=ContentMetadata( size=len(final_content.encode('utf-8')), pages=len(doc.paragraphs), mimeType="text/markdown", # Use markdown for better formatting base64Encoded=False ) )) # Create separate content item for tables only (if tables exist) if table_count > 0: table_content = [] for i, table in enumerate(doc.tables): table_content.append(f"Table {i+1}:") if table.rows: # CSV format for tables for row in table.rows: row_data = [f'"{cell.text.strip()}"' for cell in row.cells] table_content.append(",".join(row_data)) table_content.append("") # Empty line between tables table_text = "\n".join(table_content) contentItems.append(ContentItem( label="tables", data=table_text, metadata=ContentMetadata( size=len(table_text.encode('utf-8')), pages=1, mimeType="text/csv", base64Encoded=False ) )) # Create separate content item for document structure structure_info = [] structure_info.append(f"Document Structure:") structure_info.append(f"- Paragraphs: {len(doc.paragraphs)}") structure_info.append(f"- Tables: {table_count}") structure_info.append(f"- Sections: {len(doc.sections)}") # Count different paragraph styles style_counts = {} for para in doc.paragraphs: style_name = para.style.name if para.style else "Normal" style_counts[style_name] = style_counts.get(style_name, 0) + 1 for style, count in style_counts.items(): structure_info.append(f"- {style}: {count}") structure_text = "\n".join(structure_info) contentItems.append(ContentItem( label="structure", data=structure_text, metadata=ContentMetadata( size=len(structure_text.encode('utf-8')), pages=1, mimeType="text/plain", base64Encoded=False ) )) return contentItems except Exception as e: logger.error(f"Error processing Word document: {str(e)}") raise FileProcessingError(f"Failed to process Word document: {str(e)}") async def _processXlsx(self, fileData: bytes, filename: str, mimeType: str) -> List[ContentItem]: """Process Excel document with enhanced table extraction and metadata""" try: self._loadOfficeExtractor() if not officeExtractorLoaded: raise FileProcessingError("Office extraction libraries not available") contentItems = [] with io.BytesIO(fileData) as xlsxStream: try: workbook = openpyxl.load_workbook(xlsxStream, data_only=True) except Exception as load_error: logger.error(f"Failed to load Excel workbook {filename}: {str(load_error)}") raise FileProcessingError(f"Failed to load Excel workbook: {str(load_error)}") # Extract workbook properties safely workbook_props = [] try: if hasattr(workbook, 'properties'): props = workbook.properties # Log all available attributes for debugging for attr in dir(props): if not attr.startswith('_'): # Skip private attributes try: value = getattr(props, attr) if value is not None: pass except Exception as attr_error: logger.debug(f"Could not read property {attr}: {str(attr_error)}") # Check each property safely before accessing if hasattr(props, 'title') and props.title: workbook_props.append(f"Title: {props.title}") if hasattr(props, 'creator') and props.creator: # 'creator' is the correct attribute workbook_props.append(f"Author: {props.creator}") if hasattr(props, 'subject') and props.subject: workbook_props.append(f"Subject: {props.subject}") if hasattr(props, 'keywords') and props.keywords: workbook_props.append(f"Keywords: {props.keywords}") if hasattr(props, 'comments') and props.comments: workbook_props.append(f"Comments: {props.comments}") if hasattr(props, 'category') and props.category: workbook_props.append(f"Category: {props.category}") if hasattr(props, 'description') and props.description: workbook_props.append(f"Description: {props.description}") if hasattr(props, 'lastModifiedBy') and props.lastModifiedBy: workbook_props.append(f"Last Modified By: {props.lastModifiedBy}") if hasattr(props, 'created') and props.created: workbook_props.append(f"Created: {props.created}") if hasattr(props, 'modified') and props.modified: workbook_props.append(f"Modified: {props.modified}") # Try alternative property names that might exist if hasattr(props, 'author') and props.author: # Some versions use 'author' workbook_props.append(f"Author (alt): {props.author}") if hasattr(props, 'manager') and props.manager: workbook_props.append(f"Manager: {props.manager}") if hasattr(props, 'company') and props.company: workbook_props.append(f"Company: {props.company}") if hasattr(props, 'status') and props.status: workbook_props.append(f"Status: {props.status}") if hasattr(props, 'revision') and props.revision: workbook_props.append(f"Revision: {props.revision}") else: # Try to find properties in other locations for attr in dir(workbook): if not attr.startswith('_') and 'prop' in attr.lower(): pass except Exception as props_error: logger.warning(f"Could not extract workbook properties: {str(props_error)}") workbook_props = [] # Create workbook overview content item overview_content = [] overview_content.append("Excel Workbook Overview") overview_content.append("=" * 30) overview_content.append(f"Total Sheets: {len(workbook.sheetnames)}") overview_content.append(f"Sheet Names: {', '.join(workbook.sheetnames)}") if workbook_props: overview_content.append("\nWorkbook Properties:") overview_content.extend(workbook_props) overview_text = "\n".join(overview_content) contentItems.append(ContentItem( label="overview", data=overview_text, metadata=ContentMetadata( size=len(overview_text.encode('utf-8')), pages=1, mimeType="text/plain", base64Encoded=False ) )) # Process each sheet for sheetIndex, sheetName in enumerate(workbook.sheetnames): try: sheet = workbook[sheetName] logger.debug(f"Processing sheet {sheetIndex + 1}: {sheetName}") # Get sheet metadata sheet_metadata = [] sheet_metadata.append(f"Sheet: {sheetName}") try: sheet_metadata.append(f"Dimensions: {sheet.dimensions}") sheet_metadata.append(f"Max Row: {sheet.max_row}") sheet_metadata.append(f"Max Column: {sheet.max_column}") except Exception as dim_error: logger.warning(f"Could not get sheet dimensions for {sheetName}: {str(dim_error)}") sheet_metadata.append("Dimensions: Unable to determine") sheet_metadata.append("Max Row: Unknown") sheet_metadata.append("Max Column: Unknown") # Check for sheet properties safely try: if hasattr(sheet, 'sheet_properties'): sheet_props = sheet.sheet_properties if hasattr(sheet_props, 'tabColor') and sheet_props.tabColor: sheet_metadata.append(f"Tab Color: {sheet_props.tabColor}") if hasattr(sheet_props, 'hidden') and sheet_props.hidden: sheet_metadata.append("Hidden: Yes") if hasattr(sheet_props, 'name') and sheet_props.name: sheet_metadata.append(f"Internal Name: {sheet_props.name}") except Exception as sheet_props_error: logger.debug(f"Could not extract sheet properties for {sheetName}: {str(sheet_props_error)}") # Extract data from sheet sheet_data = [] try: # Find the actual data range (skip empty rows/columns) min_row = sheet.min_row max_row = sheet.max_row min_col = sheet.min_column max_col = sheet.max_column # Adjust for empty sheets if max_row == 0 or max_col == 0: sheet_metadata.append("Content: Empty sheet") sheet_data.append("(Empty sheet)") else: # Extract all data with proper CSV formatting for row_num in range(min_row, max_row + 1): row_data = [] for col_num in range(min_col, max_col + 1): try: cell = sheet.cell(row=row_num, column=col_num) cell_value = cell.value # Handle different data types if cell_value is None: row_data.append("") elif isinstance(cell_value, (int, float)): row_data.append(str(cell_value)) elif isinstance(cell_value, datetime): row_data.append(cell_value.strftime("%Y-%m-%d %H:%M:%S")) else: # Escape quotes and wrap in quotes for CSV cell_str = str(cell_value).replace('"', '""') row_data.append(f'"{cell_str}"') except Exception as cell_error: logger.debug(f"Error processing cell at row {row_num}, col {col_num}: {str(cell_error)}") row_data.append("(Error reading cell)") sheet_data.append(",".join(row_data)) sheet_metadata.append(f"Data Rows: {len(sheet_data)}") sheet_metadata.append(f"Data Columns: {max_col - min_col + 1}") except Exception as data_error: logger.warning(f"Could not extract data from sheet {sheetName}: {str(data_error)}") sheet_metadata.append("Content: Error extracting data") sheet_data.append(f"(Error: {str(data_error)})") # Create sheet content item sheet_content = "\n".join(sheet_metadata) + "\n\n" + "\n".join(sheet_data) contentItems.append(ContentItem( label=f"sheet_{sheetIndex + 1}_{sheetName}", data=sheet_content, metadata=ContentMetadata( size=len(sheet_content.encode('utf-8')), pages=1, mimeType="text/csv", base64Encoded=False ) )) # Create separate CSV file for each sheet (clean format) if sheet_data and sheet_data[0].strip() and not sheet_data[0].startswith("(Error"): # Create clean CSV without metadata csv_content = "\n".join(sheet_data) contentItems.append(ContentItem( label=f"csv_{sheetIndex + 1}_{sheetName}", data=csv_content, metadata=ContentMetadata( size=len(csv_content.encode('utf-8')), pages=1, mimeType="text/csv", base64Encoded=False ) )) except Exception as sheet_error: logger.error(f"Error processing sheet {sheetName}: {str(sheet_error)}") # Create error content item for this sheet error_content = f"Error processing sheet: {sheetName}\nError: {str(sheet_error)}" contentItems.append(ContentItem( label=f"error_sheet_{sheetIndex + 1}_{sheetName}", data=error_content, metadata=ContentMetadata( size=len(error_content.encode('utf-8')), pages=1, mimeType="text/plain", base64Encoded=False ) )) # Create summary content item try: summary_content = [] summary_content.append("Excel Processing Summary") summary_content.append("=" * 30) summary_content.append(f"Total Sheets Processed: {len(workbook.sheetnames)}") total_rows = 0 total_cells = 0 for sheetName in workbook.sheetnames: try: sheet = workbook[sheetName] if hasattr(sheet, 'max_row') and hasattr(sheet, 'max_column'): if sheet.max_row > 0 and sheet.max_column > 0: sheet_rows = sheet.max_row sheet_cells = sheet.max_row * sheet.max_column total_rows += sheet_rows total_cells += sheet_cells summary_content.append(f"- {sheetName}: {sheet_rows} rows, {sheet_cells} cells") except Exception as summary_error: logger.debug(f"Could not get summary for sheet {sheetName}: {str(summary_error)}") summary_content.append(f"- {sheetName}: Error getting summary") summary_content.append(f"\nTotal Rows: {total_rows}") summary_content.append(f"Total Cells: {total_cells}") summary_text = "\n".join(summary_content) contentItems.append(ContentItem( label="summary", data=summary_text, metadata=ContentMetadata( size=len(summary_text.encode('utf-8')), pages=1, mimeType="text/plain", base64Encoded=False ) )) except Exception as summary_error: logger.warning(f"Could not create summary: {str(summary_error)}") return contentItems except Exception as e: logger.error(f"Error processing Excel document: {str(e)}") raise FileProcessingError(f"Failed to process Excel document: {str(e)}") async def _processLegacyDoc(self, fileData: bytes, filename: str, mimeType: str) -> List[ContentItem]: """Process legacy Word .doc document""" try: # Try to use antiword or similar tools for .doc files # For now, we'll provide a basic binary extraction with metadata contentItems = [] # Create a basic content item explaining the limitation info_content = f"""Legacy Word Document (.doc) - {filename} Note: This is a legacy .doc format file. For better content extraction, consider converting to .docx format. File size: {len(fileData)} bytes Format: Microsoft Word 97-2003 Document Content extraction from .doc files requires specialized tools like: - antiword (Linux/Unix) - catdoc (Linux/Unix) - Microsoft Word (for conversion) The raw binary content is available but not human-readable.""" contentItems.append(ContentItem( label="info", data=info_content, metadata=ContentMetadata( size=len(info_content.encode('utf-8')), pages=1, mimeType="text/plain", base64Encoded=False ) )) # Also provide the binary content for potential processing contentItems.append(ContentItem( label="binary", data=base64.b64encode(fileData).decode('utf-8'), metadata=ContentMetadata( size=len(fileData), mimeType=mimeType, base64Encoded=True ) )) return contentItems except Exception as e: logger.error(f"Error processing legacy Word document: {str(e)}") raise FileProcessingError(f"Failed to process legacy Word document: {str(e)}") async def _processLegacyXls(self, fileData: bytes, filename: str, mimeType: str) -> List[ContentItem]: """Process legacy Excel .xls document""" try: # Try to use xlrd or similar tools for .xls files # For now, we'll provide a basic binary extraction with metadata contentItems = [] # Create a basic content item explaining the limitation info_content = f"""Legacy Excel Document (.xls) - {filename} Note: This is a legacy .xls format file. For better content extraction, consider converting to .xlsx format. File size: {len(fileData)} bytes Format: Microsoft Excel 97-2003 Workbook Content extraction from .xls files requires specialized tools like: - xlrd (Python library) - Microsoft Excel (for conversion) - LibreOffice (for conversion) The raw binary content is available but not human-readable.""" contentItems.append(ContentItem( label="info", data=info_content, metadata=ContentMetadata( size=len(info_content.encode('utf-8')), pages=1, mimeType="text/plain", base64Encoded=False ) )) # Also provide the binary content for potential processing contentItems.append(ContentItem( label="binary", data=base64.b64encode(fileData).decode('utf-8'), metadata=ContentMetadata( size=len(fileData), mimeType=mimeType, base64Encoded=True ) )) return contentItems except Exception as e: logger.error(f"Error processing legacy Excel document: {str(e)}") raise FileProcessingError(f"Failed to process legacy Excel document: {str(e)}") async def _processLegacyPpt(self, fileData: bytes, filename: str, mimeType: str) -> List[ContentItem]: """Process legacy PowerPoint .ppt document""" try: # Try to use python-pptx or similar tools for .ppt files # For now, we'll provide a basic binary extraction with metadata contentItems = [] # Create a basic content item explaining the limitation info_content = f"""Legacy PowerPoint Document (.ppt) - {filename} Note: This is a legacy .ppt format file. For better content extraction, consider converting to .pptx format. File size: {len(fileData)} bytes Format: Microsoft PowerPoint 97-2003 Presentation Content extraction from .ppt files requires specialized tools like: - python-pptx (limited support for .ppt) - Microsoft PowerPoint (for conversion) - LibreOffice (for conversion) The raw binary content is available but not human-readable.""" contentItems.append(ContentItem( label="info", data=info_content, metadata=ContentMetadata( size=len(info_content.encode('utf-8')), pages=1, mimeType="text/plain", base64Encoded=False ) )) # Also provide the binary content for potential processing contentItems.append(ContentItem( label="binary", data=base64.b64encode(fileData).decode('utf-8'), metadata=ContentMetadata( size=len(fileData), mimeType=mimeType, base64Encoded=True ) )) return contentItems except Exception as e: logger.error(f"Error processing legacy PowerPoint document: {str(e)}") raise FileProcessingError(f"Failed to process legacy PowerPoint document: {str(e)}") async def _processPptx(self, fileData: bytes, filename: str, mimeType: str) -> List[ContentItem]: """Process PowerPoint document""" try: self._loadOfficeExtractor() if not officeExtractorLoaded: raise FileProcessingError("Office extraction libraries not available") contentItems = [] try: # Try to use python-pptx for PowerPoint processing from pptx import Presentation with io.BytesIO(fileData) as pptxStream: prs = Presentation(pptxStream) for slideNum, slide in enumerate(prs.slides): slideText = [] # Extract text from shapes for shape in slide.shapes: if hasattr(shape, "text") and shape.text: slideText.append(shape.text) # Extract text from text boxes for shape in slide.shapes: if shape.has_text_frame: for paragraph in shape.text_frame.paragraphs: if paragraph.text: slideText.append(paragraph.text) if slideText: content = "\n".join(slideText) contentItems.append(ContentItem( label=f"slide_{slideNum + 1}", data=content, metadata=ContentMetadata( size=len(content.encode('utf-8')), pages=1, mimeType="text/plain", base64Encoded=False ) )) if not contentItems: # Fallback: treat as binary if no text extracted contentItems.append(ContentItem( label="presentation", data=base64.b64encode(fileData).decode('utf-8'), metadata=ContentMetadata( size=len(fileData), pages=len(prs.slides) if hasattr(prs, 'slides') else 1, mimeType="application/vnd.openxmlformats-officedocument.presentationml.presentation", base64Encoded=True ) )) except ImportError: # python-pptx not available, treat as binary contentItems.append(ContentItem( label="presentation", data=base64.b64encode(fileData).decode('utf-8'), metadata=ContentMetadata( size=len(fileData), pages=1, mimeType="application/vnd.openxmlformats-officedocument.presentationml.presentation", base64Encoded=True ) )) return contentItems except Exception as e: logger.error(f"Error processing PowerPoint document: {str(e)}") raise FileProcessingError(f"Failed to process PowerPoint document: {str(e)}") async def _processBinary(self, fileData: bytes, filename: str, mimeType: str) -> List[ContentItem]: """Process binary document""" try: return [ContentItem( label="binary", data=base64.b64encode(fileData).decode('utf-8'), metadata=ContentMetadata( size=len(fileData), mimeType=mimeType, base64Encoded=True, error="Unsupported file type" ) )] except Exception as e: logger.error(f"Error processing binary document: {str(e)}") raise FileProcessingError(f"Failed to process binary document: {str(e)}") async def _aiDataExtraction(self, contentItems: List[ContentItem], prompt: str) -> List[ContentItem]: """ Process content items with AI, handling chunking based on content type. Args: contentItems: List of content items to process prompt: Prompt for AI content extraction Returns: List of processed content items """ processedItems = [] for item in contentItems: try: # Get content type from metadata mimeType = item.metadata.mimeType if hasattr(item.metadata, 'mimeType') else "text/plain" # Chunk content based on type if mimeType.startswith('text/'): chunks = self._chunkText(item.data, mimeType) elif mimeType == "image/svg+xml": # SVG files are XML, treat as text chunks = self._chunkXml(item.data) elif mimeType.startswith('image/'): # Images should not be chunked - process as single unit chunks = [item.data] elif mimeType == "application/pdf": chunks = self._chunkPdf(item.data) elif mimeType == "application/vnd.openxmlformats-officedocument.wordprocessingml.document": chunks = self._chunkDocx(item.data) elif mimeType == "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet": chunks = self._chunkXlsx(item.data) elif mimeType.startswith('application/vnd.openxmlformats-officedocument.presentationml.presentation'): chunks = self._chunkPptx(item.data) elif mimeType.startswith('text/x-') or mimeType.startswith('application/') and any(keyword in mimeType for keyword in ['script', 'code', 'source', 'yaml', 'toml', 'dockerfile', 'makefile', 'cmake', 'gradle', 'maven']): # Programming languages, configuration files, and build files chunks = self._chunkCode(item.data) else: # Binary data - no chunking chunks = [item.data] # Process each chunk chunkResults = [] for chunk in chunks: # Process with AI based on content type try: if mimeType.startswith('image/') and mimeType != "image/svg+xml": # For images (excluding SVG), extract meaningful content as text # Use AI to analyze the image and extract relevant information # Create a specific prompt for image content extraction imagePrompt = f""" Analyze this image and extract the actual content and information from it. Focus on extracting text, data, charts, diagrams, or any meaningful content. If there's text in the image, extract it. If there are charts or diagrams, describe the data. Return the extracted content in a clear, structured text format. Original prompt: {prompt} """ processedContent = await self._serviceCenter.callAiImageBasic(imagePrompt, chunk, mimeType) else: # For text content (including SVG), use text AI service # Neutralize content if neutralizer is enabled (only for text) contentToProcess = chunk if self._neutralizer and contentToProcess: contentToProcess = self._neutralizer.neutralize(contentToProcess) # Create AI prompt for text content aiPrompt = f""" Extract relevant information from this content based on the following prompt: PROMPT: {prompt} CONTENT: {contentToProcess} Return ONLY the extracted information in a clear, concise format. """ # Special handling for JavaScript and other code files - preserve complete content if mimeType == "application/javascript" or mimeType == "application/typescript" or mimeType.startswith("text/x-") or any(keyword in mimeType for keyword in ['script', 'code', 'source']): # For code files, preserve the complete content without AI processing processedContent = contentToProcess else: processedContent = await self._serviceCenter.callAiTextBasic(aiPrompt, contentToProcess) chunkResults.append(processedContent) except Exception as aiError: logger.error(f"AI processing failed for chunk: {str(aiError)}") # For non-text content, don't fallback to binary data if mimeType.startswith('image/') or mimeType.startswith('video/') or mimeType.startswith('audio/'): logger.warning(f"Skipping binary content fallback for {mimeType}") continue # Skip this chunk entirely else: # Only fallback to original content for text-based formats chunkResults.append(chunk) # Combine chunk results if chunkResults: # For text content, combine all chunks if (mimeType.startswith('text/') or mimeType in ["application/pdf", "application/vnd.openxmlformats-officedocument.wordprocessingml.document", "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", "application/vnd.openxmlformats-officedocument.presentationml.presentation"] or mimeType.startswith('text/x-') or mimeType.startswith('application/') and any(keyword in mimeType for keyword in ['script', 'code', 'source', 'yaml', 'toml', 'dockerfile', 'makefile', 'cmake', 'gradle', 'maven', 'javascript', 'typescript', 'sql', 'dart'])): combinedResult = "\n".join(chunkResults) else: # For binary content, use the first result combinedResult = chunkResults[0] else: # No chunks processed, use original content combinedResult = item.data # Only add processed item if we have results if combinedResult and combinedResult.strip(): processedItems.append(ContentItem( label=item.label, data=combinedResult, metadata=ContentMetadata( size=len(combinedResult.encode('utf-8')), pages=item.metadata.pages if hasattr(item.metadata, 'pages') else 1, mimeType=item.metadata.mimeType if hasattr(item.metadata, 'mimeType') else "text/plain", base64Encoded=item.metadata.base64Encoded if hasattr(item.metadata, 'base64Encoded') else False ) )) else: logger.warning(f"No processed content available for {item.label}, skipping item") except Exception as e: logger.error(f"Error processing content chunk: {str(e)}") # Add original content if processing fails processedItems.append(item) return processedItems def _chunkText(self, content: str, mimeType: str) -> List[str]: """Chunk text content based on mime type""" if mimeType == "text/plain": return self._chunkPlainText(content) elif mimeType == "text/csv": return self._chunkCsv(content) elif mimeType == "application/json": return self._chunkJson(content) elif mimeType == "application/xml": return self._chunkXml(content) elif mimeType == "text/html": return self._chunkHtml(content) elif mimeType == "text/markdown" or mimeType == "text/x-rst" or mimeType == "text/x-wiki": return self._chunkMarkdown(content) elif mimeType == "application/javascript" or mimeType == "application/typescript": # JavaScript and TypeScript files get special handling return self._chunkJavaScript(content) elif mimeType.startswith("text/x-") or mimeType.startswith("application/") and any(keyword in mimeType for keyword in ['script', 'code', 'source', 'yaml', 'toml', 'dockerfile', 'makefile', 'cmake', 'gradle', 'maven']): # Programming languages, configuration files, and build files return self._chunkCode(content) elif mimeType == "application/vnd.openxmlformats-officedocument.wordprocessingml.document": # Word documents with markdown formatting return self._chunkWordDocument(content) elif mimeType == "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet": # Excel documents with structured data return self._chunkExcelDocument(content) else: return self._chunkPlainText(content) def _chunkPlainText(self, content: str) -> List[str]: """Chunk plain text content""" chunks = [] currentChunk = [] currentSize = 0 for line in content.split('\n'): lineSize = len(line.encode('utf-8')) if currentSize + lineSize > self.chunkSizes["plain"]: if currentChunk: chunks.append('\n'.join(currentChunk)) currentChunk = [line] currentSize = lineSize else: currentChunk.append(line) currentSize += lineSize if currentChunk: chunks.append('\n'.join(currentChunk)) return chunks def _chunkCsv(self, content: str) -> List[str]: """Chunk CSV content""" chunks = [] currentChunk = [] currentSize = 0 for line in content.split('\n'): lineSize = len(line.encode('utf-8')) if currentSize + lineSize > self.chunkSizes["csv"]: if currentChunk: chunks.append('\n'.join(currentChunk)) currentChunk = [line] currentSize = lineSize else: currentChunk.append(line) currentSize += lineSize if currentChunk: chunks.append('\n'.join(currentChunk)) return chunks def _chunkJson(self, content: str) -> List[str]: """Chunk JSON content""" try: data = json.loads(content) chunks = [] currentChunk = [] currentSize = 0 def processValue(value, path=""): nonlocal currentChunk, currentSize valueStr = json.dumps({path: value}) if path else json.dumps(value) valueSize = len(valueStr.encode('utf-8')) if currentSize + valueSize > self.chunkSizes["json"]: if currentChunk: chunks.append(json.dumps(currentChunk)) currentChunk = [value] currentSize = valueSize else: currentChunk.append(value) currentSize += valueSize if isinstance(data, list): for i, item in enumerate(data): processValue(item, str(i)) elif isinstance(data, dict): for key, value in data.items(): processValue(value, key) else: processValue(data) if currentChunk: chunks.append(json.dumps(currentChunk)) return chunks except json.JSONDecodeError: return [content] def _chunkXml(self, content: str) -> List[str]: """Chunk XML content""" try: root = ET.fromstring(content) chunks = [] currentChunk = [] currentSize = 0 def processElement(element, path=""): nonlocal currentChunk, currentSize elementStr = ET.tostring(element, encoding='unicode') elementSize = len(elementStr.encode('utf-8')) if currentSize + elementSize > self.chunkSizes["xml"]: if currentChunk: chunks.append(''.join(currentChunk)) currentChunk = [elementStr] currentSize = elementSize else: currentChunk.append(elementStr) currentSize += elementSize for child in root: processElement(child) if currentChunk: chunks.append(''.join(currentChunk)) return chunks except ET.ParseError: return [content] def _chunkHtml(self, content: str) -> List[str]: """Chunk HTML content with improved semantic chunking""" try: soup = BeautifulSoup(content, 'html.parser') chunks = [] currentChunk = [] currentSize = 0 # Use smaller chunk size for HTML to avoid token limits html_chunk_size = min(self.chunkSizes["html"], 15000) # Max 15KB per chunk def processElement(element): nonlocal currentChunk, currentSize elementStr = str(element) elementSize = len(elementStr.encode('utf-8')) # If element is too large, split it if elementSize > html_chunk_size: # Split large elements by their content if hasattr(element, 'get_text'): text_content = element.get_text(separator='\n', strip=True) if text_content: # Split text content into smaller chunks text_chunks = self._chunkTextBySize(text_content, html_chunk_size) for text_chunk in text_chunks: if currentChunk: chunks.append(''.join(currentChunk)) currentChunk = [f"<{element.name}>{text_chunk}"] currentSize = len(currentChunk[0].encode('utf-8')) else: # For elements without text, just add them if currentChunk: chunks.append(''.join(currentChunk)) currentChunk = [elementStr] currentSize = elementSize elif currentSize + elementSize > html_chunk_size: if currentChunk: chunks.append(''.join(currentChunk)) currentChunk = [elementStr] currentSize = elementSize else: currentChunk.append(elementStr) currentSize += elementSize # Process elements in order of importance for element in soup.find_all(['h1', 'h2', 'h3', 'h4', 'h5', 'h6']): processElement(element) for element in soup.find_all(['p', 'div', 'section', 'article']): processElement(element) for element in soup.find_all(['ul', 'ol', 'table']): processElement(element) # Process remaining elements for element in soup.find_all(): if element.name not in ['h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'p', 'div', 'section', 'article', 'ul', 'ol', 'table']: processElement(element) if currentChunk: chunks.append(''.join(currentChunk)) return chunks except Exception: return [content] def _chunkTextBySize(self, text: str, max_size: int) -> List[str]: """Helper method to chunk text by size""" chunks = [] current_chunk = "" for line in text.split('\n'): line_size = len(line.encode('utf-8')) if len(current_chunk.encode('utf-8')) + line_size > max_size: if current_chunk: chunks.append(current_chunk.strip()) current_chunk = line else: current_chunk += "\n" + line if current_chunk else line if current_chunk: chunks.append(current_chunk.strip()) return chunks def _chunkMarkdown(self, content: str) -> List[str]: """Chunk Markdown content""" chunks = [] currentChunk = [] currentSize = 0 # Split by headers, lists, and code blocks # This is a simplified approach; a more robust solution would involve a proper Markdown parser lines = content.split('\n') for line in lines: lineSize = len(line.encode('utf-8')) if currentSize + lineSize > self.chunkSizes["text"]: # Use "text" chunk size for Markdown if currentChunk: chunks.append('\n'.join(currentChunk)) currentChunk = [line] currentSize = lineSize else: currentChunk.append(line) currentSize += lineSize if currentChunk: chunks.append('\n'.join(currentChunk)) return chunks def _chunkCode(self, content: str) -> List[str]: """Chunk code content with optimized chunking for programming languages""" chunks = [] currentChunk = [] currentSize = 0 # Use larger chunk size for code to minimize unnecessary splitting # Code files often have long lines and complex structures code_chunk_size = min(self.chunkSizes["code"], 80000) # Max 80KB per chunk for code # Split by lines to preserve code structure lines = content.split('\n') for line in lines: lineSize = len(line.encode('utf-8')) if currentSize + lineSize > code_chunk_size: if currentChunk: chunks.append('\n'.join(currentChunk)) currentChunk = [line] currentSize = lineSize else: currentChunk.append(line) currentSize += lineSize if currentChunk: chunks.append('\n'.join(currentChunk)) return chunks def _chunkJavaScript(self, content: str) -> List[str]: """Chunk JavaScript content with optimized chunking for JavaScript files""" chunks = [] currentChunk = [] currentSize = 0 # Use larger chunk size for JavaScript to minimize unnecessary splitting # JavaScript files often have long lines and complex structures js_chunk_size = min(self.chunkSizes["javascript"], 80000) # Max 80KB per chunk for JavaScript # Split by lines to preserve code structure lines = content.split('\n') for line in lines: lineSize = len(line.encode('utf-8')) if currentSize + lineSize > js_chunk_size: if currentChunk: chunks.append('\n'.join(currentChunk)) currentChunk = [line] currentSize = lineSize else: currentChunk.append(line) currentSize += lineSize if currentChunk: chunks.append('\n'.join(currentChunk)) return chunks def _chunkBinary(self, content: str) -> List[str]: """Chunk binary content""" try: # Check if content is base64 encoded or plain text try: # Try to decode as base64 binaryData = base64.b64decode(content) # If successful, it's base64 - chunk the binary data chunks = [] chunkSize = self.chunkSizes["binary"] for i in range(0, len(binaryData), chunkSize): chunk = binaryData[i:i + chunkSize] chunks.append(base64.b64encode(chunk).decode('utf-8')) return chunks except Exception: # If base64 decoding fails, treat as text and chunk by lines lines = content.split('\n') chunks = [] currentChunk = [] currentSize = 0 for line in lines: lineSize = len(line.encode('utf-8')) if currentSize + lineSize > self.chunkSizes["binary"]: if currentChunk: chunks.append('\n'.join(currentChunk)) currentChunk = [line] currentSize = lineSize else: currentChunk.append(line) currentSize += lineSize if currentChunk: chunks.append('\n'.join(currentChunk)) return chunks except Exception: return [content] async def _chunkPdf(self, content: str) -> List[str]: """Chunk PDF content""" try: # Content is already text from _processPdf, not base64 # Split by lines to create chunks lines = content.split('\n') chunks = [] currentChunk = [] currentSize = 0 for line in lines: lineSize = len(line.encode('utf-8')) if currentSize + lineSize > self.chunkSizes["pdf"]: if currentChunk: chunks.append('\n'.join(currentChunk)) currentChunk = [line] currentSize = lineSize else: currentChunk.append(line) currentSize += lineSize if currentChunk: chunks.append('\n'.join(currentChunk)) return chunks except Exception: return [content] async def _chunkDocx(self, content: str) -> List[str]: """Chunk Word document content""" try: # Content is already text from _processDocx, not base64 # Split by lines to create chunks lines = content.split('\n') chunks = [] currentChunk = [] currentSize = 0 for line in lines: lineSize = len(line.encode('utf-8')) if currentSize + lineSize > self.chunkSizes["docx"]: if currentChunk: chunks.append('\n'.join(currentChunk)) currentChunk = [line] currentSize = lineSize else: currentChunk.append(line) currentSize += lineSize if currentChunk: chunks.append('\n'.join(currentChunk)) return chunks except Exception: return [content] async def _chunkXlsx(self, content: str) -> List[str]: """Chunk Excel document content""" try: # Content is already text (CSV format) from _processXlsx, not base64 # Split by lines to create chunks lines = content.split('\n') chunks = [] currentChunk = [] currentSize = 0 for line in lines: lineSize = len(line.encode('utf-8')) if currentSize + lineSize > self.chunkSizes["xlsx"]: if currentChunk: chunks.append('\n'.join(currentChunk)) currentChunk = [line] currentSize = lineSize else: currentChunk.append(line) currentSize += lineSize if currentChunk: chunks.append('\n'.join(currentChunk)) return chunks except Exception: return [content] async def _chunkPptx(self, content: str) -> List[str]: """Chunk PowerPoint document content""" try: # Content is already text from PowerPoint processing, not base64 # Split by lines to create chunks lines = content.split('\n') chunks = [] currentChunk = [] currentSize = 0 for line in lines: lineSize = len(line.encode('utf-8')) if currentSize + lineSize > self.chunkSizes["pptx"]: if currentChunk: chunks.append('\n'.join(currentChunk)) currentChunk = [line] currentSize = lineSize else: currentChunk.append(line) currentSize += lineSize if currentChunk: chunks.append('\n'.join(currentChunk)) return chunks except Exception: return [content] def _chunkWordDocument(self, content: str) -> List[str]: """Chunk Word document content with markdown formatting preservation""" chunks = [] currentChunk = [] currentSize = 0 # Use larger chunk size for Word documents to preserve formatting word_chunk_size = min(self.chunkSizes["docx"], 60000) # Max 60KB per chunk # Split by lines to preserve document structure lines = content.split('\n') for line in lines: lineSize = len(line.encode('utf-8')) # Check if adding this line would exceed chunk size if currentSize + lineSize > word_chunk_size: if currentChunk: chunks.append('\n'.join(currentChunk)) currentChunk = [line] currentSize = lineSize else: currentChunk.append(line) currentSize += lineSize # Add the last chunk if it exists if currentChunk: chunks.append('\n'.join(currentChunk)) return chunks def _chunkExcelDocument(self, content: str) -> List[str]: """Chunk Excel document content with data structure preservation""" chunks = [] currentChunk = [] currentSize = 0 # Use larger chunk size for Excel documents to preserve table structure excel_chunk_size = min(self.chunkSizes["xlsx"], 80000) # Max 80KB per chunk # Split by lines to preserve CSV structure lines = content.split('\n') for line in lines: lineSize = len(line.encode('utf-8')) # Check if adding this line would exceed chunk size if currentSize + lineSize > excel_chunk_size: if currentChunk: chunks.append('\n'.join(currentChunk)) currentChunk = [line] currentSize = lineSize else: currentChunk.append(line) currentSize += lineSize # Add the last chunk if it exists if currentChunk: chunks.append('\n'.join(currentChunk)) return chunks