130 lines
4.7 KiB
Python
130 lines
4.7 KiB
Python
# Copyright (c) 2025 Patrick Motsch
|
|
# All rights reserved.
|
|
from typing import Any, Dict, List
|
|
import io
|
|
|
|
from ..subUtils import makeId
|
|
from modules.datamodels.datamodelExtraction import ContentPart
|
|
from ..subRegistry import Extractor
|
|
|
|
|
|
class DocxExtractor(Extractor):
|
|
"""
|
|
Extractor for Microsoft Word documents.
|
|
|
|
Supported formats:
|
|
- MIME types: application/vnd.openxmlformats-officedocument.wordprocessingml.document
|
|
- File extensions: .docx
|
|
- Special handling: Extracts paragraphs and tables (converts tables to CSV)
|
|
- Dependencies: python-docx
|
|
"""
|
|
|
|
def __init__(self):
|
|
self._loaded = False
|
|
self._haveLibs = False
|
|
|
|
def _load(self):
|
|
if self._loaded:
|
|
return
|
|
self._loaded = True
|
|
try:
|
|
global docx
|
|
import docx # python-docx
|
|
self._haveLibs = True
|
|
except Exception:
|
|
self._haveLibs = False
|
|
|
|
def detect(self, fileName: str, mimeType: str, headBytes: bytes) -> bool:
|
|
return mimeType == "application/vnd.openxmlformats-officedocument.wordprocessingml.document" or (fileName or "").lower().endswith(".docx")
|
|
|
|
def getSupportedExtensions(self) -> list[str]:
|
|
"""Return list of supported file extensions."""
|
|
return [".docx"]
|
|
|
|
def getSupportedMimeTypes(self) -> list[str]:
|
|
"""Return list of supported MIME types."""
|
|
return ["application/vnd.openxmlformats-officedocument.wordprocessingml.document"]
|
|
|
|
def extract(self, fileBytes: bytes, context: Dict[str, Any]) -> List[ContentPart]:
|
|
self._load()
|
|
parts: List[ContentPart] = []
|
|
rootId = makeId()
|
|
parts.append(ContentPart(
|
|
id=rootId,
|
|
parentId=None,
|
|
label="docx",
|
|
typeGroup="container",
|
|
mimeType="application/vnd.openxmlformats-officedocument.wordprocessingml.document",
|
|
data="",
|
|
metadata={"size": len(fileBytes)}
|
|
))
|
|
|
|
if not self._haveLibs:
|
|
parts.append(ContentPart(
|
|
id=makeId(),
|
|
parentId=rootId,
|
|
label="binary",
|
|
typeGroup="binary",
|
|
mimeType="application/vnd.openxmlformats-officedocument.wordprocessingml.document",
|
|
data="",
|
|
metadata={"size": len(fileBytes), "warning": "DOCX lib not available"}
|
|
))
|
|
return parts
|
|
|
|
with io.BytesIO(fileBytes) as buf:
|
|
d = docx.Document(buf)
|
|
# paragraphs
|
|
fileName = context.get("fileName", "document.docx")
|
|
headingIndex = 0
|
|
currentSection = "body"
|
|
for i, para in enumerate(d.paragraphs):
|
|
text = para.text or ""
|
|
if not text.strip():
|
|
continue
|
|
styleName = (para.style.name or "").lower() if para.style else ""
|
|
if "heading" in styleName:
|
|
headingIndex += 1
|
|
currentSection = f"heading:{headingIndex}"
|
|
parts.append(ContentPart(
|
|
id=makeId(),
|
|
parentId=rootId,
|
|
label=f"p_{i+1}",
|
|
typeGroup="text",
|
|
mimeType="text/plain",
|
|
data=text,
|
|
metadata={
|
|
"size": len(text.encode('utf-8')),
|
|
"contextRef": {
|
|
"containerPath": fileName,
|
|
"location": f"paragraph:{i+1}",
|
|
"sectionId": currentSection,
|
|
},
|
|
}
|
|
))
|
|
for ti, table in enumerate(d.tables):
|
|
rows: list[str] = []
|
|
for row in table.rows:
|
|
cells = [ (cell.text or "").replace('"', '""') for cell in row.cells ]
|
|
rows.append(",".join([f'"{c}"' for c in cells]))
|
|
csvData = "\n".join(rows)
|
|
if csvData:
|
|
parts.append(ContentPart(
|
|
id=makeId(),
|
|
parentId=rootId,
|
|
label=f"table_{ti+1}",
|
|
typeGroup="table",
|
|
mimeType="text/csv",
|
|
data=csvData,
|
|
metadata={
|
|
"size": len(csvData.encode('utf-8')),
|
|
"contextRef": {
|
|
"containerPath": fileName,
|
|
"location": f"table:{ti+1}",
|
|
"sectionId": currentSection,
|
|
},
|
|
}
|
|
))
|
|
|
|
return parts
|
|
|
|
|