from typing import Any, Dict, List import io from ..subUtils import makeId from modules.datamodels.datamodelExtraction import ContentPart from ..subRegistry import Extractor class DocxExtractor(Extractor): """ Extractor for Microsoft Word documents. Supported formats: - MIME types: application/vnd.openxmlformats-officedocument.wordprocessingml.document - File extensions: .docx - Special handling: Extracts paragraphs and tables (converts tables to CSV) - Dependencies: python-docx """ def __init__(self): self._loaded = False self._haveLibs = False def _load(self): if self._loaded: return self._loaded = True try: global docx import docx # python-docx self._haveLibs = True except Exception: self._haveLibs = False def detect(self, fileName: str, mimeType: str, headBytes: bytes) -> bool: return mimeType == "application/vnd.openxmlformats-officedocument.wordprocessingml.document" or (fileName or "").lower().endswith(".docx") def getSupportedExtensions(self) -> list[str]: """Return list of supported file extensions.""" return [".docx"] def getSupportedMimeTypes(self) -> list[str]: """Return list of supported MIME types.""" return ["application/vnd.openxmlformats-officedocument.wordprocessingml.document"] def extract(self, fileBytes: bytes, context: Dict[str, Any]) -> List[ContentPart]: self._load() parts: List[ContentPart] = [] rootId = makeId() parts.append(ContentPart( id=rootId, parentId=None, label="docx", typeGroup="container", mimeType="application/vnd.openxmlformats-officedocument.wordprocessingml.document", data="", metadata={"size": len(fileBytes)} )) if not self._haveLibs: parts.append(ContentPart( id=makeId(), parentId=rootId, label="binary", typeGroup="binary", mimeType="application/vnd.openxmlformats-officedocument.wordprocessingml.document", data="", metadata={"size": len(fileBytes), "warning": "DOCX lib not available"} )) return parts with io.BytesIO(fileBytes) as buf: d = docx.Document(buf) # paragraphs for i, para in enumerate(d.paragraphs): text = para.text or "" if text.strip(): parts.append(ContentPart( id=makeId(), parentId=rootId, label=f"p_{i+1}", typeGroup="text", mimeType="text/plain", data=text, metadata={"size": len(text.encode('utf-8'))} )) # tables → CSV rows for ti, table in enumerate(d.tables): rows: list[str] = [] for row in table.rows: cells = [ (cell.text or "").replace('"', '""') for cell in row.cells ] rows.append(",".join([f'"{c}"' for c in cells])) csvData = "\n".join(rows) if csvData: parts.append(ContentPart( id=makeId(), parentId=rootId, label=f"table_{ti+1}", typeGroup="table", mimeType="text/csv", data=csvData, metadata={"size": len(csvData.encode('utf-8'))} )) return parts