gateway/modules/services/serviceExtraction/formats/xlsx_extractor.py
2025-10-03 01:41:50 +02:00

94 lines
3.3 KiB
Python

from typing import Any, Dict, List
import io
from datetime import datetime
from ..subUtils import makeId
from modules.datamodels.datamodelExtraction import ContentPart
from ..subRegistry import Extractor
class XlsxExtractor(Extractor):
def __init__(self):
self._loaded = False
self._haveLibs = False
def _load(self):
if self._loaded:
return
self._loaded = True
try:
global openpyxl
import openpyxl
self._haveLibs = True
except Exception:
self._haveLibs = False
def detect(self, fileName: str, mimeType: str, headBytes: bytes) -> bool:
mt = "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
return mimeType == mt or (fileName or "").lower().endswith((".xlsx", ".xlsm"))
def extract(self, fileBytes: bytes, context: Dict[str, Any]) -> List[ContentPart]:
self._load()
parts: List[ContentPart] = []
rootId = makeId()
parts.append(ContentPart(
id=rootId,
parentId=None,
label="xlsx",
typeGroup="container",
mimeType="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
data="",
metadata={"size": len(fileBytes)}
))
if not self._haveLibs:
parts.append(ContentPart(
id=makeId(),
parentId=rootId,
label="binary",
typeGroup="binary",
mimeType="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
data="",
metadata={"size": len(fileBytes), "warning": "openpyxl not available"}
))
return parts
with io.BytesIO(fileBytes) as buf:
wb = openpyxl.load_workbook(buf, data_only=True)
for sheetName in wb.sheetnames:
ws = wb[sheetName]
# extract rectangular data region by min/max
min_row = ws.min_row
max_row = ws.max_row
min_col = ws.min_column
max_col = ws.max_column
lines: list[str] = []
for r in range(min_row, max_row + 1):
cells: list[str] = []
for c in range(min_col, max_col + 1):
cell = ws.cell(row=r, column=c)
v = cell.value
if v is None:
cells.append("")
elif isinstance(v, (int, float)):
cells.append(str(v))
elif isinstance(v, datetime):
cells.append(v.strftime("%Y-%m-%d %H:%M:%S"))
else:
escaped_value = str(v).replace('"', '""')
cells.append(f'"{escaped_value}"')
lines.append(",".join(cells))
csvData = "\n".join(lines)
parts.append(ContentPart(
id=makeId(),
parentId=rootId,
label=f"sheet_{sheetName}",
typeGroup="table",
mimeType="text/csv",
data=csvData,
metadata={"sheet": sheetName, "size": len(csvData.encode('utf-8'))}
))
return parts