gateway/modules/chat/documents/documentUtility.py
2025-07-22 18:15:02 +02:00

132 lines
No EOL
6 KiB
Python

import json
import logging
from typing import Any, Dict
logger = logging.getLogger(__name__)
def getFileExtension(filename: str) -> str:
"""Extract file extension from filename"""
if '.' in filename:
return filename.rsplit('.', 1)[-1].lower()
return ''
def getMimeTypeFromExtension(extension: str, service=None) -> str:
"""Get MIME type based on file extension. Optionally use a service for mapping."""
if service:
return service.getMimeTypeFromExtension(extension)
# Fallback mapping
mapping = {
'txt': 'text/plain',
'md': 'text/markdown',
'html': 'text/html',
'css': 'text/css',
'js': 'application/javascript',
'json': 'application/json',
'csv': 'text/csv',
'xml': 'application/xml',
'py': 'text/x-python',
'pdf': 'application/pdf',
'docx': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document',
'xlsx': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
'png': 'image/png',
'jpg': 'image/jpeg',
'jpeg': 'image/jpeg',
'gif': 'image/gif',
'svg': 'image/svg+xml',
}
return mapping.get(extension.lower(), 'application/octet-stream')
def detectMimeTypeFromData(file_bytes: bytes, filename: str, service=None) -> str:
"""Detect MIME type from file bytes and filename using a service if provided."""
try:
if service:
detected = service.detectContentTypeFromData(file_bytes, filename)
if detected and detected != 'application/octet-stream':
return detected
# Fallback: guess from extension
ext = getFileExtension(filename)
return getMimeTypeFromExtension(ext, service)
except Exception as e:
logger.warning(f"Error in MIME type detection for {filename}: {str(e)}")
return 'application/octet-stream'
def detectMimeTypeFromContent(content: Any, filename: str, service=None) -> str:
"""Detect MIME type from content and filename using a service if provided."""
try:
if isinstance(content, str):
file_bytes = content.encode('utf-8')
elif isinstance(content, dict):
file_bytes = json.dumps(content, ensure_ascii=False).encode('utf-8')
else:
file_bytes = str(content).encode('utf-8')
return detectMimeTypeFromData(file_bytes, filename, service)
except Exception as e:
logger.warning(f"Error in MIME type detection for {filename}: {str(e)}")
return 'application/octet-stream'
def convertDocumentDataToString(document_data: Any, file_extension: str) -> str:
"""Convert document data to string content based on file type with enhanced processing."""
try:
if document_data is None:
return ""
if isinstance(document_data, str):
return document_data
if isinstance(document_data, dict):
if file_extension == 'json':
return json.dumps(document_data, indent=2, ensure_ascii=False)
elif file_extension in ['txt', 'md', 'html', 'css', 'js', 'py']:
text_fields = ['content', 'text', 'data', 'result', 'summary', 'extracted_content', 'table_data']
for field in text_fields:
if field in document_data:
content = document_data[field]
if isinstance(content, str):
return content
elif isinstance(content, (dict, list)):
return json.dumps(content, indent=2, ensure_ascii=False)
return json.dumps(document_data, indent=2, ensure_ascii=False)
elif file_extension == 'csv':
csv_fields = ['table_data', 'csv_data', 'rows', 'data', 'content', 'text']
for field in csv_fields:
if field in document_data:
content = document_data[field]
if isinstance(content, str):
return content
elif isinstance(content, list):
if content and isinstance(content[0], (list, dict)):
import csv
import io
output = io.StringIO()
if isinstance(content[0], dict):
if content:
fieldnames = content[0].keys()
writer = csv.DictWriter(output, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(content)
else:
writer = csv.writer(output)
writer.writerows(content)
return output.getvalue()
return json.dumps(document_data, indent=2, ensure_ascii=False)
else:
return json.dumps(document_data, indent=2, ensure_ascii=False)
elif isinstance(document_data, list):
if file_extension == 'csv':
import csv
import io
output = io.StringIO()
if document_data and isinstance(document_data[0], dict):
fieldnames = document_data[0].keys()
writer = csv.DictWriter(output, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(document_data)
else:
writer = csv.writer(output)
writer.writerows(document_data)
return output.getvalue()
else:
return json.dumps(document_data, indent=2, ensure_ascii=False)
else:
return str(document_data)
except Exception as e:
logger.error(f"Error converting document data to string: {str(e)}")
return str(document_data)