93 lines
3.3 KiB
Python
93 lines
3.3 KiB
Python
from typing import Any, Dict, List
|
|
import io
|
|
from datetime import datetime
|
|
|
|
from ..utils import makeId
|
|
from modules.datamodels.datamodelExtraction import ContentPart
|
|
from ..subRegistry import Extractor
|
|
|
|
|
|
class XlsxExtractor(Extractor):
|
|
def __init__(self):
|
|
self._loaded = False
|
|
self._haveLibs = False
|
|
|
|
def _load(self):
|
|
if self._loaded:
|
|
return
|
|
self._loaded = True
|
|
try:
|
|
global openpyxl
|
|
import openpyxl
|
|
self._haveLibs = True
|
|
except Exception:
|
|
self._haveLibs = False
|
|
|
|
def detect(self, fileName: str, mimeType: str, headBytes: bytes) -> bool:
|
|
mt = "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
|
|
return mimeType == mt or (fileName or "").lower().endswith((".xlsx", ".xlsm"))
|
|
|
|
def extract(self, fileBytes: bytes, context: Dict[str, Any]) -> List[ContentPart]:
|
|
self._load()
|
|
parts: List[ContentPart] = []
|
|
rootId = makeId()
|
|
parts.append(ContentPart(
|
|
id=rootId,
|
|
parentId=None,
|
|
label="xlsx",
|
|
typeGroup="container",
|
|
mimeType="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
|
|
data="",
|
|
metadata={"size": len(fileBytes)}
|
|
))
|
|
|
|
if not self._haveLibs:
|
|
parts.append(ContentPart(
|
|
id=makeId(),
|
|
parentId=rootId,
|
|
label="binary",
|
|
typeGroup="binary",
|
|
mimeType="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
|
|
data="",
|
|
metadata={"size": len(fileBytes), "warning": "openpyxl not available"}
|
|
))
|
|
return parts
|
|
|
|
with io.BytesIO(fileBytes) as buf:
|
|
wb = openpyxl.load_workbook(buf, data_only=True)
|
|
for sheetName in wb.sheetnames:
|
|
ws = wb[sheetName]
|
|
# extract rectangular data region by min/max
|
|
min_row = ws.min_row
|
|
max_row = ws.max_row
|
|
min_col = ws.min_column
|
|
max_col = ws.max_column
|
|
lines: list[str] = []
|
|
for r in range(min_row, max_row + 1):
|
|
cells: list[str] = []
|
|
for c in range(min_col, max_col + 1):
|
|
cell = ws.cell(row=r, column=c)
|
|
v = cell.value
|
|
if v is None:
|
|
cells.append("")
|
|
elif isinstance(v, (int, float)):
|
|
cells.append(str(v))
|
|
elif isinstance(v, datetime):
|
|
cells.append(v.strftime("%Y-%m-%d %H:%M:%S"))
|
|
else:
|
|
cells.append(f'"{str(v).replace("\"", "\"\"")}"')
|
|
lines.append(",".join(cells))
|
|
csvData = "\n".join(lines)
|
|
parts.append(ContentPart(
|
|
id=makeId(),
|
|
parentId=rootId,
|
|
label=f"sheet_{sheetName}",
|
|
typeGroup="table",
|
|
mimeType="text/csv",
|
|
data=csvData,
|
|
metadata={"sheet": sheetName, "size": len(csvData.encode('utf-8'))}
|
|
))
|
|
|
|
return parts
|
|
|
|
|