gateway/modules/chat_content_extraction.py

778 lines
No EOL
29 KiB
Python

"""
Module for extracting content from various file formats.
Provides specialized functions for processing text, PDF, Office documents, images, etc.
"""
import logging
import os
import io
from typing import Dict, Any, List, Optional, Union, Tuple
import base64
# Configure logger
logger = logging.getLogger(__name__)
# Optional imports - only loaded when needed
pdf_extractor_loaded = False
office_extractor_loaded = False
image_processor_loaded = False
def get_document_contents(file_metadata: Dict[str, Any], file_content: bytes) -> List[Dict[str, Any]]:
"""
Main function for extracting content from a file based on its MIME type.
Delegates to specialized extraction functions.
Args:
file_metadata: File metadata (Name, MIME type, etc.)
file_content: Binary data of the file
Returns:
List of Document-Content objects with metadata and is_text flag
"""
try:
mime_type = file_metadata.get("mime_type", "application/octet-stream")
file_name = file_metadata.get("name", "unknown")
logger.info(f"Extracting content from file '{file_name}' (MIME type: {mime_type})")
# Extract content based on MIME type
contents = []
# Text-based formats
if mime_type.startswith("text/") or mime_type in [
"application/json",
"application/xml",
"application/javascript",
"application/x-python"
]:
contents.extend(extract_text_content(file_name, file_content, mime_type))
# CSV Format
elif mime_type == "text/csv":
contents.extend(extract_csv_content(file_name, file_content))
# Images
elif mime_type.startswith("image/"):
contents.extend(extract_image_content(file_name, file_content, mime_type))
# PDF Documents
elif mime_type == "application/pdf":
contents.extend(extract_pdf_content(file_name, file_content))
# Word Documents
elif mime_type in [
"application/vnd.openxmlformats-officedocument.wordprocessingml.document",
"application/msword"
]:
contents.extend(extract_word_content(file_name, file_content, mime_type))
# Excel Documents
elif mime_type in [
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
"application/vnd.ms-excel"
]:
contents.extend(extract_excel_content(file_name, file_content, mime_type))
# PowerPoint Documents
elif mime_type in [
"application/vnd.openxmlformats-officedocument.presentationml.presentation",
"application/vnd.ms-powerpoint"
]:
contents.extend(extract_powerpoint_content(file_name, file_content, mime_type))
# Binary data as fallback for unknown formats
else:
contents.extend(extract_binary_content(file_name, file_content, mime_type))
# Fallback when no content could be extracted
if not contents:
logger.warning(f"No content extracted from file '{file_name}', using binary fallback")
contents.append({
"sequence_nr": 1,
"name": '1_undefined',
"ext": os.path.splitext(file_name)[1][1:] if os.path.splitext(file_name)[1] else "bin",
"content_type": mime_type,
"data": file_content,
"metadata": {
"is_text": False
}
})
# Add generic attributes for all documents
for content in contents:
if isinstance(content.get("data"), bytes):
content["data"] = base64.b64encode(content["data"]).decode('utf-8')
# Add base64 flag
if "metadata" not in content:
content["metadata"] = {}
content["metadata"]["base64_encoded"] = True
logger.info(f"Successfully extracted {len(contents)} content items from file '{file_name}'")
return contents
except Exception as e:
logger.error(f"Error during content extraction: {str(e)}")
# Fallback on error - return original data
return [{
"sequence_nr": 1,
"name": file_metadata.get("name", "unknown"),
"ext": os.path.splitext(file_metadata.get("name", ""))[1][1:] if os.path.splitext(file_metadata.get("name", ""))[1] else "bin",
"content_type": file_metadata.get("mime_type", "application/octet-stream"),
"data": file_content,
"metadata": {
"is_text": False
}
}]
def _load_pdf_extractor():
"""Loads PDF extraction libraries when needed"""
global pdf_extractor_loaded
if not pdf_extractor_loaded:
try:
global PyPDF2, fitz
import PyPDF2
import fitz # PyMuPDF for more extensive PDF processing
pdf_extractor_loaded = True
logger.info("PDF extraction libraries successfully loaded")
except ImportError as e:
logger.warning(f"PDF extraction libraries could not be loaded: {e}")
def _load_office_extractor():
"""Loads Office document extraction libraries when needed"""
global office_extractor_loaded
if not office_extractor_loaded:
try:
global docx, openpyxl
import docx # python-docx for Word documents
import openpyxl # for Excel files
office_extractor_loaded = True
logger.info("Office extraction libraries successfully loaded")
except ImportError as e:
logger.warning(f"Office extraction libraries could not be loaded: {e}")
def _load_image_processor():
"""Loads image processing libraries when needed"""
global image_processor_loaded
if not image_processor_loaded:
try:
global PIL, Image
from PIL import Image
image_processor_loaded = True
logger.info("Image processing libraries successfully loaded")
except ImportError as e:
logger.warning(f"Image processing libraries could not be loaded: {e}")
def extract_text_content(file_name: str, file_content: bytes, mime_type: str) -> List[Dict[str, Any]]:
"""
Extracts text from text files.
Args:
file_name: Name of the file
file_content: Binary data of the file
mime_type: MIME type of the file
Returns:
List of Text-Content objects with metadata.is_text = True
"""
try:
# Keep original file extension
file_extension = os.path.splitext(file_name)[1][1:] if os.path.splitext(file_name)[1] else "txt"
# Extract text content
text_content = file_content.decode('utf-8')
return [{
"sequence_nr": 1,
"name": "1_text", # Simplified naming
"ext": file_extension,
"content_type": "text",
"data": text_content,
"metadata": {
"is_text": True
}
}]
except UnicodeDecodeError:
logger.warning(f"Could not decode text from file '{file_name}' as UTF-8, trying alternative encodings")
try:
# Try alternative encodings
for encoding in ['latin-1', 'cp1252', 'iso-8859-1']:
try:
text_content = file_content.decode(encoding)
logger.info(f"Text successfully decoded with encoding {encoding}")
return [{
"sequence_nr": 1,
"name": "1_text", # Simplified naming
"ext": file_extension,
"content_type": "text",
"data": text_content,
"metadata": {
"is_text": True,
"encoding": encoding
}
}]
except UnicodeDecodeError:
continue
# Fallback to binary data if no encoding works
logger.warning(f"Could not decode text, using binary data")
return [{
"sequence_nr": 1,
"name": "1_binary", # Simplified naming
"ext": file_extension,
"content_type": mime_type,
"data": file_content,
"metadata": {
"is_text": False
}
}]
except Exception as e:
logger.error(f"Error in alternative text decoding: {str(e)}")
# Return binary data as fallback
return [{
"sequence_nr": 1,
"name": "1_binary", # Simplified naming
"ext": file_extension,
"content_type": mime_type,
"data": file_content,
"metadata": {
"is_text": False
}
}]
def extract_csv_content(file_name: str, file_content: bytes) -> List[Dict[str, Any]]:
"""
Extracts content from CSV files.
Args:
file_name: Name of the file
file_content: Binary data of the file
Returns:
List of CSV-Content objects with metadata.is_text = True
"""
try:
# Extract text content
csv_content = file_content.decode('utf-8')
return [{
"sequence_nr": 1,
"name": "1_csv", # Simplified naming
"ext": "csv",
"content_type": "csv",
"data": csv_content,
"metadata": {
"is_text": True,
"format": "csv"
}
}]
except UnicodeDecodeError:
logger.warning(f"Could not decode CSV from file '{file_name}' as UTF-8, trying alternative encodings")
try:
# Try alternative encodings for CSV
for encoding in ['latin-1', 'cp1252', 'iso-8859-1']:
try:
csv_content = file_content.decode(encoding)
logger.info(f"CSV successfully decoded with encoding {encoding}")
return [{
"sequence_nr": 1,
"name": "1_csv", # Simplified naming
"ext": "csv",
"content_type": "csv",
"data": csv_content,
"metadata": {
"is_text": True,
"encoding": encoding,
"format": "csv"
}
}]
except UnicodeDecodeError:
continue
# Fallback to binary data
return [{
"sequence_nr": 1,
"name": "1_binary", # Simplified naming
"ext": "csv",
"content_type": "text/csv",
"data": file_content,
"metadata": {
"is_text": False
}
}]
except Exception as e:
logger.error(f"Error in alternative CSV decoding: {str(e)}")
return [{
"sequence_nr": 1,
"name": "1_binary", # Simplified naming
"ext": "csv",
"content_type": "text/csv",
"data": file_content,
"metadata": {
"is_text": False
}
}]
def extract_image_content(file_name: str, file_content: bytes, mime_type: str) -> List[Dict[str, Any]]:
"""
Extracts content from image files and optionally generates metadata descriptions.
Args:
file_name: Name of the file
file_content: Binary data of the file
mime_type: MIME type of the file
Returns:
List of Image-Content objects with metadata.is_text = False
"""
# Extract file extension from MIME type or filename
file_extension = mime_type.split('/')[-1]
if file_extension == "jpeg":
file_extension = "jpg"
# If possible, analyze image and extract metadata
image_metadata = {
"is_text": False,
"format": "image"
}
image_description = None
try:
_load_image_processor()
if image_processor_loaded and file_content and len(file_content) > 0:
with io.BytesIO(file_content) as img_stream:
try:
img = Image.open(img_stream)
# Check if the image was actually loaded
img.verify()
# To safely continue working, reload
img_stream.seek(0)
img = Image.open(img_stream)
image_metadata.update({
"format": img.format,
"mode": img.mode,
"width": img.width,
"height": img.height
})
# Extract EXIF data if available
if hasattr(img, '_getexif') and callable(img._getexif):
exif = img._getexif()
if exif:
exif_data = {}
for tag_id, value in exif.items():
exif_data[f"tag_{tag_id}"] = str(value)
image_metadata["exif"] = exif_data
# Generate image description
image_description = f"Image ({img.width}x{img.height}, {img.format}, {img.mode})"
except Exception as inner_e:
logger.warning(f"Error processing image: {str(inner_e)}")
image_metadata["error"] = str(inner_e)
image_description = f"Image (unable to process: {str(inner_e)})"
except Exception as e:
logger.warning(f"Could not extract image metadata: {str(e)}")
image_metadata["error"] = str(e)
# Return image content
contents = [{
"sequence_nr": 1,
"name": "1_image", # Simplified naming
"ext": file_extension,
"content_type": "image",
"data": file_content,
"metadata": image_metadata
}]
# If image description available, add as additional text content
if image_description:
contents.append({
"sequence_nr": 2,
"name": "2_text_image_info", # Simplified naming with label
"ext": "txt",
"content_type": "text",
"data": image_description,
"metadata": {
"is_text": True,
"image_description": True
}
})
return contents
def extract_pdf_content(file_name: str, file_content: bytes) -> List[Dict[str, Any]]:
"""
Extracts text and images from PDF files.
Args:
file_name: Name of the file
file_content: Binary data of the file
Returns:
List of PDF-Content objects (text and images) with metadata.is_text flag
"""
contents = []
extracted_content_found = False
try:
# Load PDF extraction libraries
_load_pdf_extractor()
if not pdf_extractor_loaded:
logger.warning("PDF extraction not possible: Libraries not available")
# Add original file as binary content
contents.append({
"sequence_nr": 1,
"name": "1_pdf", # Simplified naming
"ext": "pdf",
"content_type": "application/pdf",
"data": file_content,
"metadata": {
"is_text": False,
"format": "pdf"
}
})
return contents
# Extract text with PyPDF2
extracted_text = ""
pdf_metadata = {}
with io.BytesIO(file_content) as pdf_stream:
pdf_reader = PyPDF2.PdfReader(pdf_stream)
# Extract metadata
pdf_info = pdf_reader.metadata or {}
for key, value in pdf_info.items():
if key.startswith('/'):
pdf_metadata[key[1:]] = value
else:
pdf_metadata[key] = value
# Extract text from all pages
for page_num in range(len(pdf_reader.pages)):
page = pdf_reader.pages[page_num]
page_text = page.extract_text()
if page_text:
extracted_text += f"--- Page {page_num + 1} ---\n{page_text}\n\n"
# If text was found, add as separate content
if extracted_text.strip():
extracted_content_found = True
contents.append({
"sequence_nr": len(contents) + 1,
"name": f"{len(contents) + 1}_text", # Simplified naming
"ext": "txt",
"content_type": "text",
"data": extracted_text,
"metadata": {
"is_text": True,
"source": "pdf",
"pages": len(pdf_reader.pages),
"pdf_metadata": pdf_metadata
}
})
# Extract images with PyMuPDF (fitz)
try:
with io.BytesIO(file_content) as pdf_stream:
doc = fitz.open(stream=pdf_stream, filetype="pdf")
image_count = 0
for page_num in range(len(doc)):
page = doc[page_num]
image_list = page.get_images(full=True)
for img_index, img_info in enumerate(image_list):
try:
image_count += 1
xref = img_info[0]
base_image = doc.extract_image(xref)
image_bytes = base_image["image"]
image_ext = base_image["ext"]
# Add image as content
extracted_content_found = True
contents.append({
"sequence_nr": len(contents) + 1,
"name": f"{len(contents) + 1}_image_page{page_num+1}_{img_index+1}", # Simplified naming with label
"ext": image_ext,
"content_type": f"image/{image_ext}",
"data": image_bytes,
"metadata": {
"is_text": False,
"source": "pdf",
"page": page_num + 1,
"index": img_index
}
})
except Exception as img_e:
logger.warning(f"Error extracting image {img_index} on page {page_num + 1}: {str(img_e)}")
# Close document
doc.close()
except Exception as img_extract_e:
logger.warning(f"Error extracting images from PDF: {str(img_extract_e)}")
except Exception as e:
logger.error(f"Error in PDF extraction: {str(e)}")
# If no content was extracted, add the original PDF
if not extracted_content_found:
contents.append({
"sequence_nr": 1,
"name": "1_pdf", # Simplified naming
"ext": "pdf",
"content_type": "application/pdf",
"data": file_content,
"metadata": {
"is_text": False,
"format": "pdf"
}
})
return contents
def extract_word_content(file_name: str, file_content: bytes, mime_type: str) -> List[Dict[str, Any]]:
"""
Extracts text and images from Word documents.
Args:
file_name: Name of the file
file_content: Binary data of the file
mime_type: MIME type of the file
Returns:
List of Word-Content objects (text and possibly images) with metadata.is_text flag
"""
contents = []
extracted_content_found = False
# Determine file extension
file_extension = "docx" if mime_type == "application/vnd.openxmlformats-officedocument.wordprocessingml.document" else "doc"
try:
# Load Office extraction libraries
_load_office_extractor()
if not office_extractor_loaded:
logger.warning("Word extraction not possible: Libraries not available")
# Add original file as binary content
contents.append({
"sequence_nr": 1,
"name": "1_word", # Simplified naming
"ext": file_extension,
"content_type": mime_type,
"data": file_content,
"metadata": {
"is_text": False,
"format": "word"
}
})
return contents
# Only supports DOCX (newer format)
if mime_type == "application/vnd.openxmlformats-officedocument.wordprocessingml.document":
with io.BytesIO(file_content) as docx_stream:
doc = docx.Document(docx_stream)
# Extract text
full_text = []
for para in doc.paragraphs:
full_text.append(para.text)
# Extract tables
for table in doc.tables:
for row in table.rows:
row_text = []
for cell in row.cells:
row_text.append(cell.text)
full_text.append(" | ".join(row_text))
extracted_text = "\n\n".join(full_text)
# Add extracted text as content
if extracted_text.strip():
extracted_content_found = True
contents.append({
"sequence_nr": 1,
"name": "1_text", # Simplified naming
"ext": "txt",
"content_type": "text",
"data": extracted_text,
"metadata": {
"is_text": True,
"source": "docx",
"paragraph_count": len(doc.paragraphs),
"table_count": len(doc.tables)
}
})
else:
logger.warning(f"Extraction from old Word format (DOC) not supported")
except Exception as e:
logger.error(f"Error in Word extraction: {str(e)}")
# If no content was extracted, add the original document
if not extracted_content_found:
contents.append({
"sequence_nr": 1,
"name": "1_word", # Simplified naming
"ext": file_extension,
"content_type": mime_type,
"data": file_content,
"metadata": {
"is_text": False,
"format": "word"
}
})
return contents
def extract_excel_content(file_name: str, file_content: bytes, mime_type: str) -> List[Dict[str, Any]]:
"""
Extracts table data from Excel files.
Args:
file_name: Name of the file
file_content: Binary data of the file
mime_type: MIME type of the file
Returns:
List of Excel-Content objects with metadata.is_text flag
"""
contents = []
extracted_content_found = False
# Determine file extension
file_extension = "xlsx" if mime_type == "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" else "xls"
try:
# Load Office extraction libraries
_load_office_extractor()
if not office_extractor_loaded:
logger.warning("Excel extraction not possible: Libraries not available")
# Add original file as binary content
contents.append({
"sequence_nr": 1,
"name": "1_excel", # Simplified naming
"ext": file_extension,
"content_type": mime_type,
"data": file_content,
"metadata": {
"is_text": False,
"format": "excel"
}
})
return contents
# Only supports XLSX (newer format)
if mime_type == "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet":
with io.BytesIO(file_content) as xlsx_stream:
workbook = openpyxl.load_workbook(xlsx_stream, data_only=True)
# Extract each worksheet as separate CSV content
for sheet_index, sheet_name in enumerate(workbook.sheetnames):
sheet = workbook[sheet_name]
# Format data as CSV
csv_rows = []
for row in sheet.iter_rows():
csv_row = []
for cell in row:
value = cell.value
if value is None:
csv_row.append("")
else:
csv_row.append(str(value).replace('"', '""'))
csv_rows.append(','.join(f'"{cell}"' for cell in csv_row))
csv_content = "\n".join(csv_rows)
# Add as CSV content
if csv_content.strip():
extracted_content_found = True
sheet_safe_name = sheet_name.replace(" ", "_").replace("/", "_").replace("\\", "_")
contents.append({
"sequence_nr": len(contents) + 1,
"name": f"{len(contents) + 1}_csv_{sheet_safe_name}", # Simplified naming with sheet label
"ext": "csv",
"content_type": "csv",
"data": csv_content,
"metadata": {
"is_text": True,
"source": "xlsx",
"sheet": sheet_name,
"format": "csv"
}
})
else:
logger.warning(f"Extraction from old Excel format (XLS) not supported")
except Exception as e:
logger.error(f"Error in Excel extraction: {str(e)}")
# If no content was extracted, add the original document
if not extracted_content_found:
contents.append({
"sequence_nr": 1,
"name": "1_excel", # Simplified naming
"ext": file_extension,
"content_type": mime_type,
"data": file_content,
"metadata": {
"is_text": False,
"format": "excel"
}
})
return contents
def extract_powerpoint_content(file_name: str, file_content: bytes, mime_type: str) -> List[Dict[str, Any]]:
"""
Extracts content from PowerPoint presentations.
Args:
file_name: Name of the file
file_content: Binary data of the file
mime_type: MIME type of the file
Returns:
List of PowerPoint-Content objects with metadata.is_text = False
"""
# For PowerPoint, we currently only return the original binary file
# A complete extraction would require more specialized libraries
file_extension = "pptx" if mime_type == "application/vnd.openxmlformats-officedocument.presentationml.presentation" else "ppt"
return [{
"sequence_nr": 1,
"name": "1_powerpoint", # Simplified naming
"ext": file_extension,
"content_type": mime_type,
"data": file_content,
"metadata": {
"is_text": False,
"format": "powerpoint"
}
}]
def extract_binary_content(file_name: str, file_content: bytes, mime_type: str) -> List[Dict[str, Any]]:
"""
Fallback for binary files where no specific extraction is possible.
Args:
file_name: Name of the file
file_content: Binary data of the file
mime_type: MIME type of the file
Returns:
List with a binary Content object with metadata.is_text = False
"""
file_extension = os.path.splitext(file_name)[1][1:] if os.path.splitext(file_name)[1] else "bin"
return [{
"sequence_nr": 1,
"name": "1_binary", # Simplified naming
"ext": file_extension,
"content_type": mime_type,
"data": file_content,
"metadata": {
"is_text": False,
"format": "binary"
}
}]