""" Module for extracting content from various file formats. Provides specialized functions for processing text, PDF, Office documents, images, etc. """ import logging import os import io from typing import Dict, Any, List, Optional, Union, Tuple import base64 # Configure logger logger = logging.getLogger(__name__) # Optional imports - only loaded when needed pdfExtractorLoaded = False officeExtractorLoaded = False imageProcessorLoaded = False def getDocumentContents(fileMetadata: Dict[str, Any], fileContent: bytes) -> List[Dict[str, Any]]: """ Main function for extracting content from a file based on its MIME type. Delegates to specialized extraction functions. Args: fileMetadata: File metadata (Name, MIME type, etc.) fileContent: Binary data of the file Returns: List of Document-Content objects with metadata and base64Encoded flag """ try: mimeType = fileMetadata.get("mimeType", "application/octet-stream") fileName = fileMetadata.get("name", "unknown") logger.info(f"Extracting content from file '{fileName}' (MIME type: {mimeType})") # Extract content based on MIME type contents = [] # Text-based formats (excluding CSV which has its own handler) if mimeType == "text/csv": contents.extend(extractCsvContent(fileName, fileContent)) # Then handle other text-based formats elif mimeType.startswith("text/") or mimeType in [ "application/json", "application/xml", "application/javascript", "application/x-python" ]: contents.extend(extractTextContent(fileName, fileContent, mimeType)) # SVG Files elif mimeType == "image/svg+xml": contents.extend(extractSvgContent(fileName, fileContent)) # Images elif mimeType.startswith("image/"): contents.extend(extractImageContent(fileName, fileContent, mimeType)) # PDF Documents elif mimeType == "application/pdf": contents.extend(extractPdfContent(fileName, fileContent)) # Word Documents elif mimeType in [ "application/vnd.openxmlformats-officedocument.wordprocessingml.document", "application/msword" ]: contents.extend(extractWordContent(fileName, fileContent, mimeType)) # Excel Documents elif mimeType in [ "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", "application/vnd.ms-excel" ]: contents.extend(extractExcelContent(fileName, fileContent, mimeType)) # PowerPoint Documents elif mimeType in [ "application/vnd.openxmlformats-officedocument.presentationml.presentation", "application/vnd.ms-powerpoint" ]: contents.extend(extractPowerpointContent(fileName, fileContent, mimeType)) # Binary data as fallback for unknown formats else: contents.extend(extractBinaryContent(fileName, fileContent, mimeType)) # Fallback when no content could be extracted if not contents: logger.warning(f"No content extracted from file '{fileName}', using binary fallback") # Convert binary content to base64 encoded_data = base64.b64encode(fileContent).decode('utf-8') contents.append({ "sequenceNr": 1, "name": '1_undefined', "ext": os.path.splitext(fileName)[1][1:] if os.path.splitext(fileName)[1] else "bin", "contentType": mimeType, "data": encoded_data, "base64Encoded": True, "metadata": { "isText": False } }) # Add generic attributes for all documents for content in contents: # Make sure all content items have the base64Encoded flag if "base64Encoded" not in content: if isinstance(content.get("data"), bytes): # Convert bytes to base64 content["data"] = base64.b64encode(content["data"]).decode('utf-8') content["base64Encoded"] = True else: # Assume text content if not explicitly marked content["base64Encoded"] = False # Maintain backward compatibility with old "base64Encoded" flag in metadata if "metadata" not in content: content["metadata"] = {} # Set base64Encoded in metadata for backward compatibility content["metadata"]["base64Encoded"] = content["base64Encoded"] logger.info(f"Successfully extracted {len(contents)} content items from file '{fileName}'") return contents except Exception as e: logger.error(f"Error during content extraction: {str(e)}") # Fallback on error - return original data return [{ "sequenceNr": 1, "name": fileMetadata.get("name", "unknown"), "ext": os.path.splitext(fileMetadata.get("name", ""))[1][1:] if os.path.splitext(fileMetadata.get("name", ""))[1] else "bin", "contentType": fileMetadata.get("mimeType", "application/octet-stream"), "data": base64.b64encode(fileContent).decode('utf-8'), "base64Encoded": True, "metadata": { "isText": False, "base64Encoded": True # For backward compatibility } }] def _loadPdfExtractor(): """Loads PDF extraction libraries when needed""" global pdfExtractorLoaded if not pdfExtractorLoaded: try: global PyPDF2, fitz import PyPDF2 import fitz # PyMuPDF for more extensive PDF processing pdfExtractorLoaded = True logger.info("PDF extraction libraries successfully loaded") except ImportError as e: logger.warning(f"PDF extraction libraries could not be loaded: {e}") def _loadOfficeExtractor(): """Loads Office document extraction libraries when needed""" global officeExtractorLoaded if not officeExtractorLoaded: try: global docx, openpyxl import docx # python-docx for Word documents import openpyxl # for Excel files officeExtractorLoaded = True logger.info("Office extraction libraries successfully loaded") except ImportError as e: logger.warning(f"Office extraction libraries could not be loaded: {e}") def _loadImageProcessor(): """Loads image processing libraries when needed""" global imageProcessorLoaded if not imageProcessorLoaded: try: global PIL, Image from PIL import Image imageProcessorLoaded = True logger.info("Image processing libraries successfully loaded") except ImportError as e: logger.warning(f"Image processing libraries could not be loaded: {e}") def extractTextContent(fileName: str, fileContent: bytes, mimeType: str) -> List[Dict[str, Any]]: """ Extracts text from text files. Args: fileName: Name of the file fileContent: Binary data of the file mimeType: MIME type of the file Returns: List of Text-Content objects with base64Encoded = False """ try: # Keep original file extension fileExtension = os.path.splitext(fileName)[1][1:] if os.path.splitext(fileName)[1] else "txt" # Extract text content textContent = fileContent.decode('utf-8') return [{ "sequenceNr": 1, "name": "1_text", # Simplified naming "ext": fileExtension, "contentType": "text/plain", "data": textContent, "base64Encoded": False, "metadata": { "isText": True } }] except UnicodeDecodeError: logger.warning(f"Could not decode text from file '{fileName}' as UTF-8, trying alternative encodings") try: # Try alternative encodings for encoding in ['latin-1', 'cp1252', 'iso-8859-1']: try: textContent = fileContent.decode(encoding) logger.info(f"Text successfully decoded with encoding {encoding}") return [{ "sequenceNr": 1, "name": "1_text", # Simplified naming "ext": fileExtension, "contentType": "text/plain", "data": textContent, "base64Encoded": False, "metadata": { "isText": True, "encoding": encoding } }] except UnicodeDecodeError: continue # Fallback to binary data if no encoding works logger.warning(f"Could not decode text, using binary data") return [{ "sequenceNr": 1, "name": "1_binary", # Simplified naming "ext": fileExtension, "contentType": mimeType, "data": base64.b64encode(fileContent).decode('utf-8'), "base64Encoded": True, "metadata": { "isText": False } }] except Exception as e: logger.error(f"Error in alternative text decoding: {str(e)}") # Return binary data as fallback return [{ "sequenceNr": 1, "name": "1_binary", # Simplified naming "ext": fileExtension, "contentType": mimeType, "data": base64.b64encode(fileContent).decode('utf-8'), "base64Encoded": True, "metadata": { "isText": False } }] def extractCsvContent(fileName: str, fileContent: bytes) -> List[Dict[str, Any]]: """ Extracts content from CSV files. Args: fileName: Name of the file fileContent: Binary data of the file Returns: List of CSV-Content objects with base64Encoded = False """ try: # Extract text content csvContent = fileContent.decode('utf-8') return [{ "sequenceNr": 1, "name": "1_csv", # Simplified naming "ext": "csv", "contentType": "text/csv", "data": csvContent, "base64Encoded": False, "metadata": { "isText": True, "format": "csv" } }] except UnicodeDecodeError: logger.warning(f"Could not decode CSV from file '{fileName}' as UTF-8, trying alternative encodings") try: # Try alternative encodings for CSV for encoding in ['latin-1', 'cp1252', 'iso-8859-1']: try: csvContent = fileContent.decode(encoding) logger.info(f"CSV successfully decoded with encoding {encoding}") return [{ "sequenceNr": 1, "name": "1_csv", # Simplified naming "ext": "csv", "contentType": "text/csv", "data": csvContent, "base64Encoded": False, "metadata": { "isText": True, "encoding": encoding, "format": "csv" } }] except UnicodeDecodeError: continue # Fallback to binary data return [{ "sequenceNr": 1, "name": "1_binary", # Simplified naming "ext": "csv", "contentType": "text/csv", "data": base64.b64encode(fileContent).decode('utf-8'), "base64Encoded": True, "metadata": { "isText": False } }] except Exception as e: logger.error(f"Error in alternative CSV decoding: {str(e)}") return [{ "sequenceNr": 1, "name": "1_binary", # Simplified naming "ext": "csv", "contentType": "text/csv", "data": base64.b64encode(fileContent).decode('utf-8'), "base64Encoded": True, "metadata": { "isText": False } }] def extractSvgContent(fileName: str, fileContent: bytes) -> List[Dict[str, Any]]: """ Extracts content from SVG files. Args: fileName: Name of the file fileContent: Binary data of the file Returns: List of SVG-Content objects with dual text/image metadata """ contents = [] try: # Extract SVG as text content (XML) svgText = fileContent.decode('utf-8') # Check if it's actually SVG by looking for the SVG tag if " List[Dict[str, Any]]: """ Extracts content from image files and optionally generates metadata descriptions. Args: fileName: Name of the file fileContent: Binary data of the file mimeType: MIME type of the file Returns: List of Image-Content objects with base64Encoded = True """ # Extract file extension from MIME type or filename fileExtension = mimeType.split('/')[-1] if fileExtension == "jpeg": fileExtension = "jpg" # If possible, analyze image and extract metadata imageMetadata = { "isText": False, "format": "image" } imageDescription = None try: _loadImageProcessor() if imageProcessorLoaded and fileContent and len(fileContent) > 0: with io.BytesIO(fileContent) as imgStream: try: img = Image.open(imgStream) # Check if the image was actually loaded img.verify() # To safely continue working, reload imgStream.seek(0) img = Image.open(imgStream) imageMetadata.update({ "format": img.format, "mode": img.mode, "width": img.width, "height": img.height }) # Extract EXIF data if available if hasattr(img, '_getexif') and callable(img._getexif): exif = img._getexif() if exif: exifData = {} for tagId, value in exif.items(): exifData[f"tag_{tagId}"] = str(value) imageMetadata["exif"] = exifData # Generate image description imageDescription = f"Image ({img.width}x{img.height}, {img.format}, {img.mode})" except Exception as innerE: logger.warning(f"Error processing image: {str(innerE)}") imageMetadata["error"] = str(innerE) imageDescription = f"Image (unable to process: {str(innerE)})" except Exception as e: logger.warning(f"Could not extract image metadata: {str(e)}") imageMetadata["error"] = str(e) # Convert binary image to base64 encoded_data = base64.b64encode(fileContent).decode('utf-8') # Return image content contents = [{ "sequenceNr": 1, "name": "1_image", # Simplified naming "ext": fileExtension, "contentType": mimeType, "data": encoded_data, "base64Encoded": True, "metadata": imageMetadata }] # If image description available, add as additional text content if imageDescription: contents.append({ "sequenceNr": 2, "name": "2_text_image_info", # Simplified naming with label "ext": "txt", "contentType": "text/plain", "data": imageDescription, "base64Encoded": False, "metadata": { "isText": True, "imageDescription": True } }) return contents def extractPdfContent(fileName: str, fileContent: bytes) -> List[Dict[str, Any]]: """ Extracts text and images from PDF files. Args: fileName: Name of the file fileContent: Binary data of the file Returns: List of PDF-Content objects (text and images) with appropriate base64Encoded flags """ contents = [] extractedContentFound = False try: # Load PDF extraction libraries _loadPdfExtractor() if not pdfExtractorLoaded: logger.warning("PDF extraction not possible: Libraries not available") # Add original file as binary content contents.append({ "sequenceNr": 1, "name": "1_pdf", # Simplified naming "ext": "pdf", "contentType": "application/pdf", "data": base64.b64encode(fileContent).decode('utf-8'), "base64Encoded": True, "metadata": { "isText": False, "format": "pdf" } }) return contents # Extract text with PyPDF2 extractedText = "" pdfMetadata = {} with io.BytesIO(fileContent) as pdfStream: pdfReader = PyPDF2.PdfReader(pdfStream) # Extract metadata pdfInfo = pdfReader.metadata or {} for key, value in pdfInfo.items(): if key.startswith('/'): pdfMetadata[key[1:]] = value else: pdfMetadata[key] = value # Extract text from all pages for pageNum in range(len(pdfReader.pages)): page = pdfReader.pages[pageNum] pageText = page.extract_text() if pageText: extractedText += f"--- Page {pageNum + 1} ---\n{pageText}\n\n" # If text was found, add as separate content if extractedText.strip(): extractedContentFound = True contents.append({ "sequenceNr": len(contents) + 1, "name": f"{len(contents) + 1}_text", # Simplified naming "ext": "txt", "contentType": "text/plain", "data": extractedText, "base64Encoded": False, "metadata": { "isText": True, "source": "pdf", "pages": len(pdfReader.pages), "pdfMetadata": pdfMetadata } }) # Extract images with PyMuPDF (fitz) try: with io.BytesIO(fileContent) as pdfStream: doc = fitz.open(stream=pdfStream, filetype="pdf") imageCount = 0 for pageNum in range(len(doc)): page = doc[pageNum] imageList = page.get_images(full=True) for imgIndex, imgInfo in enumerate(imageList): try: imageCount += 1 xref = imgInfo[0] baseImage = doc.extract_image(xref) imageBytes = baseImage["image"] imageExt = baseImage["ext"] # Add image as content - encode as base64 extractedContentFound = True contents.append({ "sequenceNr": len(contents) + 1, "name": f"{len(contents) + 1}_image_page{pageNum+1}_{imgIndex+1}", # Simplified naming with label "ext": imageExt, "contentType": f"image/{imageExt}", "data": base64.b64encode(imageBytes).decode('utf-8'), "base64Encoded": True, "metadata": { "isText": False, "source": "pdf", "page": pageNum + 1, "index": imgIndex } }) except Exception as imgE: logger.warning(f"Error extracting image {imgIndex} on page {pageNum + 1}: {str(imgE)}") # Close document doc.close() except Exception as imgExtractE: logger.warning(f"Error extracting images from PDF: {str(imgExtractE)}") except Exception as e: logger.error(f"Error in PDF extraction: {str(e)}") # If no content was extracted, add the original PDF if not extractedContentFound: contents.append({ "sequenceNr": 1, "name": "1_pdf", # Simplified naming "ext": "pdf", "contentType": "application/pdf", "data": base64.b64encode(fileContent).decode('utf-8'), "base64Encoded": True, "metadata": { "isText": False, "format": "pdf" } }) return contents def extractWordContent(fileName: str, fileContent: bytes, mimeType: str) -> List[Dict[str, Any]]: """ Extracts text and images from Word documents. Args: fileName: Name of the file fileContent: Binary data of the file mimeType: MIME type of the file Returns: List of Word-Content objects (text and possibly images) with appropriate base64Encoded flags """ contents = [] extractedContentFound = False # Determine file extension fileExtension = "docx" if mimeType == "application/vnd.openxmlformats-officedocument.wordprocessingml.document" else "doc" try: # Load Office extraction libraries _loadOfficeExtractor() if not officeExtractorLoaded: logger.warning("Word extraction not possible: Libraries not available") # Add original file as binary content contents.append({ "sequenceNr": 1, "name": "1_word", # Simplified naming "ext": fileExtension, "contentType": mimeType, "data": base64.b64encode(fileContent).decode('utf-8'), "base64Encoded": True, "metadata": { "isText": False, "format": "word" } }) return contents # Only supports DOCX (newer format) if mimeType == "application/vnd.openxmlformats-officedocument.wordprocessingml.document": with io.BytesIO(fileContent) as docxStream: doc = docx.Document(docxStream) # Extract text fullText = [] for para in doc.paragraphs: fullText.append(para.text) # Extract tables for table in doc.tables: for row in table.rows: rowText = [] for cell in row.cells: rowText.append(cell.text) fullText.append(" | ".join(rowText)) extractedText = "\n\n".join(fullText) # Add extracted text as content if extractedText.strip(): extractedContentFound = True contents.append({ "sequenceNr": 1, "name": "1_text", # Simplified naming "ext": "txt", "contentType": "text/plain", "data": extractedText, "base64Encoded": False, "metadata": { "isText": True, "source": "docx", "paragraphCount": len(doc.paragraphs), "tableCount": len(doc.tables) } }) else: logger.warning(f"Extraction from old Word format (DOC) not supported") except Exception as e: logger.error(f"Error in Word extraction: {str(e)}") # If no content was extracted, add the original document if not extractedContentFound: contents.append({ "sequenceNr": 1, "name": "1_word", # Simplified naming "ext": fileExtension, "contentType": mimeType, "data": base64.b64encode(fileContent).decode('utf-8'), "base64Encoded": True, "metadata": { "isText": False, "format": "word" } }) return contents def extractExcelContent(fileName: str, fileContent: bytes, mimeType: str) -> List[Dict[str, Any]]: """ Extracts table data from Excel files. Args: fileName: Name of the file fileContent: Binary data of the file mimeType: MIME type of the file Returns: List of Excel-Content objects with appropriate base64Encoded flags """ contents = [] extractedContentFound = False # Determine file extension fileExtension = "xlsx" if mimeType == "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" else "xls" try: # Load Office extraction libraries _loadOfficeExtractor() if not officeExtractorLoaded: logger.warning("Excel extraction not possible: Libraries not available") # Add original file as binary content contents.append({ "sequenceNr": 1, "name": "1_excel", # Simplified naming "ext": fileExtension, "contentType": mimeType, "data": base64.b64encode(fileContent).decode('utf-8'), "base64Encoded": True, "metadata": { "isText": False, "format": "excel" } }) return contents # Only supports XLSX (newer format) if mimeType == "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet": with io.BytesIO(fileContent) as xlsxStream: workbook = openpyxl.load_workbook(xlsxStream, data_only=True) # Extract each worksheet as separate CSV content for sheetIndex, sheetName in enumerate(workbook.sheetnames): sheet = workbook[sheetName] # Format data as CSV csvRows = [] for row in sheet.iter_rows(): csvRow = [] for cell in row: value = cell.value if value is None: csvRow.append("") else: csvRow.append(str(value).replace('"', '""')) csvRows.append(','.join(f'"{cell}"' for cell in csvRow)) csvContent = "\n".join(csvRows) # Add as CSV content if csvContent.strip(): extractedContentFound = True sheetSafeName = sheetName.replace(" ", "_").replace("/", "_").replace("\\", "_") contents.append({ "sequenceNr": len(contents) + 1, "name": f"{len(contents) + 1}_csv_{sheetSafeName}", # Simplified naming with sheet label "ext": "csv", "contentType": "text/csv", "data": csvContent, "base64Encoded": False, "metadata": { "isText": True, "source": "xlsx", "sheet": sheetName, "format": "csv" } }) else: logger.warning(f"Extraction from old Excel format (XLS) not supported") except Exception as e: logger.error(f"Error in Excel extraction: {str(e)}") # If no content was extracted, add the original document if not extractedContentFound: contents.append({ "sequenceNr": 1, "name": "1_excel", # Simplified naming "ext": fileExtension, "contentType": mimeType, "data": base64.b64encode(fileContent).decode('utf-8'), "base64Encoded": True, "metadata": { "isText": False, "format": "excel" } }) return contents def extractPowerpointContent(fileName: str, fileContent: bytes, mimeType: str) -> List[Dict[str, Any]]: """ Extracts content from PowerPoint presentations. Args: fileName: Name of the file fileContent: Binary data of the file mimeType: MIME type of the file Returns: List of PowerPoint-Content objects with base64Encoded = True """ # For PowerPoint, we currently only return the original binary file # A complete extraction would require more specialized libraries fileExtension = "pptx" if mimeType == "application/vnd.openxmlformats-officedocument.presentationml.presentation" else "ppt" return [{ "sequenceNr": 1, "name": "1_powerpoint", # Simplified naming "ext": fileExtension, "contentType": mimeType, "data": base64.b64encode(fileContent).decode('utf-8'), "base64Encoded": True, "metadata": { "isText": False, "format": "powerpoint" } }] def extractBinaryContent(fileName: str, fileContent: bytes, mimeType: str) -> List[Dict[str, Any]]: """ Fallback for binary files where no specific extraction is possible. Args: fileName: Name of the file fileContent: Binary data of the file mimeType: MIME type of the file Returns: List with a binary Content object with base64Encoded = True """ fileExtension = os.path.splitext(fileName)[1][1:] if os.path.splitext(fileName)[1] else "bin" return [{ "sequenceNr": 1, "name": "1_binary", # Simplified naming "ext": fileExtension, "contentType": mimeType, "data": base64.b64encode(fileContent).decode('utf-8'), "base64Encoded": True, "metadata": { "isText": False, "format": "binary" } }]