# Copyright (c) 2025 Patrick Motsch # All rights reserved. from typing import Any, Dict, List import io from datetime import datetime from ..subUtils import makeId from modules.datamodels.datamodelExtraction import ContentPart from ..subRegistry import Extractor class XlsxExtractor(Extractor): """ Extractor for Microsoft Excel spreadsheets. Supported formats: - MIME types: application/vnd.openxmlformats-officedocument.spreadsheetml.sheet - File extensions: .xlsx, .xlsm - Special handling: Extracts all sheets as CSV data - Dependencies: openpyxl """ def __init__(self): self._loaded = False self._haveLibs = False def _load(self): if self._loaded: return self._loaded = True try: global openpyxl import openpyxl self._haveLibs = True except Exception: self._haveLibs = False def detect(self, fileName: str, mimeType: str, headBytes: bytes) -> bool: mt = "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" return mimeType == mt or (fileName or "").lower().endswith((".xlsx", ".xlsm")) def getSupportedExtensions(self) -> list[str]: """Return list of supported file extensions.""" return [".xlsx", ".xlsm"] def getSupportedMimeTypes(self) -> list[str]: """Return list of supported MIME types.""" return ["application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"] def extract(self, fileBytes: bytes, context: Dict[str, Any]) -> List[ContentPart]: self._load() parts: List[ContentPart] = [] rootId = makeId() parts.append(ContentPart( id=rootId, parentId=None, label="xlsx", typeGroup="container", mimeType="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", data="", metadata={"size": len(fileBytes)} )) if not self._haveLibs: parts.append(ContentPart( id=makeId(), parentId=rootId, label="binary", typeGroup="binary", mimeType="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", data="", metadata={"size": len(fileBytes), "warning": "openpyxl not available"} )) return parts with io.BytesIO(fileBytes) as buf: wb = openpyxl.load_workbook(buf, data_only=True) for sheetName in wb.sheetnames: ws = wb[sheetName] # extract rectangular data region by min/max min_row = ws.min_row max_row = ws.max_row min_col = ws.min_column max_col = ws.max_column lines: list[str] = [] for r in range(min_row, max_row + 1): cells: list[str] = [] for c in range(min_col, max_col + 1): cell = ws.cell(row=r, column=c) v = cell.value if v is None: cells.append("") elif isinstance(v, (int, float)): cells.append(str(v)) elif isinstance(v, datetime): cells.append(v.strftime("%Y-%m-%d %H:%M:%S")) else: escaped_value = str(v).replace('"', '""') cells.append(f'"{escaped_value}"') lines.append(",".join(cells)) csvData = "\n".join(lines) parts.append(ContentPart( id=makeId(), parentId=rootId, label=f"sheet_{sheetName}", typeGroup="table", mimeType="text/csv", data=csvData, metadata={"sheet": sheetName, "size": len(csvData.encode('utf-8'))} )) return parts