# Contains all document creation functions extracted from managerChat.py import logging import json from typing import Dict, Any, Optional, List, Union from datetime import datetime, UTC class DocumentCreator: def __init__(self, service): self.service = service def getFileExtension(self, filename: str) -> str: """Extract file extension from filename""" return self.service.getFileExtension(filename) def getMimeType(self, extension: str) -> str: """Get MIME type based on file extension""" return self.service.getMimeTypeFromExtension(extension) def detectMimeTypeFromContent(self, content: Any, filename: str) -> str: """ Detect MIME type from content and filename using service center. Only returns a detected MIME type if it's better than application/octet-stream. """ try: if isinstance(content, str): file_bytes = content.encode('utf-8') elif isinstance(content, dict): file_bytes = json.dumps(content, ensure_ascii=False).encode('utf-8') else: file_bytes = str(content).encode('utf-8') detected_mime_type = self.service.detectContentTypeFromData(file_bytes, filename) if detected_mime_type != "application/octet-stream": return detected_mime_type return "application/octet-stream" except Exception as e: logging.warning(f"Error in MIME type detection for {filename}: {str(e)}") return 'application/octet-stream' def detectMimeTypeFromDocument(self, document: Any, filename: str) -> str: """ Detect MIME type from document object using service center. Only returns a detected MIME type if it's better than application/octet-stream. """ try: content = getattr(document, 'content', '') if isinstance(content, str): file_bytes = content.encode('utf-8') else: file_bytes = str(content).encode('utf-8') detected_mime_type = self.service.detectContentTypeFromData(file_bytes, filename) if detected_mime_type != "application/octet-stream": return detected_mime_type return "application/octet-stream" except Exception as e: logging.warning(f"Error in MIME type detection for document {filename}: {str(e)}") return 'application/octet-stream' def convertDocumentDataToString(self, document_data: Dict[str, Any], file_extension: str) -> str: """Convert document data to string content based on file type with enhanced processing""" try: if document_data is None: return "" if isinstance(document_data, str): return document_data if isinstance(document_data, dict): if file_extension == 'json': return json.dumps(document_data, indent=2, ensure_ascii=False) elif file_extension in ['txt', 'md', 'html', 'css', 'js', 'py']: text_fields = ['content', 'text', 'data', 'result', 'summary', 'extracted_content', 'table_data'] for field in text_fields: if field in document_data: content = document_data[field] if isinstance(content, str): return content elif isinstance(content, (dict, list)): return json.dumps(content, indent=2, ensure_ascii=False) return json.dumps(document_data, indent=2, ensure_ascii=False) elif file_extension == 'csv': csv_fields = ['table_data', 'csv_data', 'rows', 'data', 'content', 'text'] for field in csv_fields: if field in document_data: content = document_data[field] if isinstance(content, str): return content elif isinstance(content, list): if content and isinstance(content[0], (list, dict)): import csv import io output = io.StringIO() if isinstance(content[0], dict): if content: fieldnames = content[0].keys() writer = csv.DictWriter(output, fieldnames=fieldnames) writer.writeheader() writer.writerows(content) else: writer = csv.writer(output) writer.writerows(content) return output.getvalue() return json.dumps(document_data, indent=2, ensure_ascii=False) else: return json.dumps(document_data, indent=2, ensure_ascii=False) elif isinstance(document_data, list): if file_extension == 'csv': import csv import io output = io.StringIO() if document_data and isinstance(document_data[0], dict): fieldnames = document_data[0].keys() writer = csv.DictWriter(output, fieldnames=fieldnames) writer.writeheader() writer.writerows(document_data) else: writer = csv.writer(output) writer.writerows(document_data) return output.getvalue() else: return json.dumps(document_data, indent=2, ensure_ascii=False) else: return str(document_data) except Exception as e: logging.error(f"Error converting document data to string: {str(e)}") return str(document_data)