gateway/modules/serviceCenter/services/serviceKnowledge/subPreScan.py
2026-03-29 21:55:09 +02:00

429 lines
13 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""Structure Pre-Scan: fast, AI-free document analysis.
Extracts TOC, headings, page map, image positions, and structural metadata
from documents. Used as the first step in the auto-index pipeline.
Supported formats:
- PDF: TOC, heading detection (font-size heuristic), page map, image positions
- DOCX: heading styles, paragraph map
- PPTX: slide titles, slide map
- XLSX: sheet names, row/column counts
- Other: minimal index (single content object = the file itself)
"""
import io
import logging
from typing import Dict, Any, List, Optional
from modules.datamodels.datamodelKnowledge import FileContentIndex
from modules.datamodels.datamodelContent import ContentObjectSummary, ContentContextRef
logger = logging.getLogger(__name__)
async def preScanDocument(
fileData: bytes,
mimeType: str,
fileId: str,
fileName: str = "",
userId: str = "",
featureInstanceId: str = "",
mandateId: str = "",
scope: str = "personal",
) -> FileContentIndex:
"""Create a structural FileContentIndex without AI.
This is purely programmatic: TOC extraction, heading detection,
page mapping, image position scanning.
"""
scanner = _SCANNER_MAP.get(mimeType)
if scanner is None:
ext = (fileName.rsplit(".", 1)[-1].lower()) if "." in fileName else ""
scanner = _EXTENSION_SCANNER_MAP.get(ext, _scanMinimal)
try:
structure, objectSummary, totalObjects, totalSize = await scanner(fileData, fileName)
except Exception as e:
logger.error(f"Pre-scan failed for {fileName} ({mimeType}): {e}")
structure = {"error": str(e)}
objectSummary = []
totalObjects = 0
totalSize = len(fileData)
return FileContentIndex(
id=fileId,
userId=userId,
featureInstanceId=featureInstanceId,
mandateId=mandateId,
scope=scope,
fileName=fileName,
mimeType=mimeType,
totalObjects=totalObjects,
totalSize=totalSize,
structure=structure,
objectSummary=[s.model_dump() for s in objectSummary],
status="extracted",
)
# ---------------------------------------------------------------------------
# PDF scanner
# ---------------------------------------------------------------------------
async def _scanPdf(fileData: bytes, fileName: str):
try:
import fitz
except ImportError:
logger.warning("PyMuPDF not installed -- PDF pre-scan unavailable")
return _fallbackStructure(fileData, fileName)
doc = fitz.open(stream=fileData, filetype="pdf")
toc = doc.get_toc()
pageMap: List[Dict[str, Any]] = []
summaries: List[ContentObjectSummary] = []
totalSize = 0
objIndex = 0
for i in range(len(doc)):
page = doc[i]
textLen = len(page.get_text())
blocks = page.get_text("dict", flags=0).get("blocks", [])
headings = []
for b in blocks:
if b.get("type") != 0:
continue
for line in b.get("lines", []):
for span in line.get("spans", []):
if _isHeading(span):
headings.append(span.get("text", "").strip())
images = page.get_images(full=True)
hasTable = _detectTableHeuristic(page)
pageMap.append({
"pageIndex": i,
"headings": headings,
"hasImages": len(images) > 0,
"imageCount": len(images),
"textLength": textLen,
"hasTable": hasTable,
})
if textLen > 0:
summaries.append(ContentObjectSummary(
id=f"co-{objIndex}",
contentType="text",
contextRef=ContentContextRef(
containerPath=fileName,
location=f"page:{i+1}",
pageIndex=i,
),
charCount=textLen,
))
totalSize += textLen
objIndex += 1
for j in range(len(images)):
summaries.append(ContentObjectSummary(
id=f"co-{objIndex}",
contentType="image",
contextRef=ContentContextRef(
containerPath=fileName,
location=f"page:{i+1}/image:{j}",
pageIndex=i,
),
))
objIndex += 1
sections = _buildSectionsFromTocOrHeadings(toc, pageMap)
doc.close()
structure = {
"pages": len(pageMap),
"toc": toc,
"sections": sections,
"pageMap": pageMap,
"imageCount": sum(p.get("imageCount", 0) for p in pageMap),
"tableCount": sum(1 for p in pageMap if p.get("hasTable")),
}
return structure, summaries, len(summaries), totalSize
def _isHeading(span: Dict) -> bool:
"""Heuristic: heading if font size >= 14 or bold + size >= 12."""
size = span.get("size", 0)
flags = span.get("flags", 0)
isBold = bool(flags & (1 << 4))
return size >= 14 or (isBold and size >= 12)
def _detectTableHeuristic(page) -> bool:
"""Detect tables by looking for grid-like line patterns."""
try:
drawings = page.get_drawings()
lineCount = sum(1 for d in drawings if d.get("type") == "l")
return lineCount >= 6
except Exception:
return False
def _buildSectionsFromTocOrHeadings(
toc: list, pageMap: List[Dict]
) -> List[Dict[str, Any]]:
"""Build section boundaries from TOC or heading data."""
sections: List[Dict[str, Any]] = []
if toc:
for i, entry in enumerate(toc):
level, title, pageNum = entry[0], entry[1], entry[2]
endPage = toc[i + 1][2] - 1 if i + 1 < len(toc) else len(pageMap) - 1
sections.append({
"id": f"section-{i}",
"title": title,
"level": level,
"startPage": pageNum - 1,
"endPage": endPage,
})
else:
currentSection = None
for pm in pageMap:
headings = pm.get("headings", [])
if headings:
if currentSection:
currentSection["endPage"] = pm["pageIndex"] - 1
sections.append(currentSection)
currentSection = {
"id": f"section-{len(sections)}",
"title": headings[0],
"level": 1,
"startPage": pm["pageIndex"],
"endPage": pm["pageIndex"],
}
elif currentSection:
currentSection["endPage"] = pm["pageIndex"]
if currentSection:
sections.append(currentSection)
return sections
# ---------------------------------------------------------------------------
# DOCX scanner
# ---------------------------------------------------------------------------
async def _scanDocx(fileData: bytes, fileName: str):
try:
import docx
except ImportError:
return _fallbackStructure(fileData, fileName)
doc = docx.Document(io.BytesIO(fileData))
summaries: List[ContentObjectSummary] = []
sections: List[Dict[str, Any]] = []
totalSize = 0
objIndex = 0
currentSection = None
for i, para in enumerate(doc.paragraphs):
text = para.text or ""
styleName = (para.style.name or "").lower() if para.style else ""
if "heading" in styleName and text.strip():
if currentSection:
sections.append(currentSection)
level = 1
for ch in styleName:
if ch.isdigit():
level = int(ch)
break
currentSection = {
"id": f"section-{len(sections)}",
"title": text.strip(),
"level": level,
"startParagraph": i,
"endParagraph": i,
}
elif currentSection:
currentSection["endParagraph"] = i
if text.strip():
summaries.append(ContentObjectSummary(
id=f"co-{objIndex}",
contentType="text",
contextRef=ContentContextRef(
containerPath=fileName,
location=f"paragraph:{i+1}",
sectionId=currentSection["id"] if currentSection else "body",
),
charCount=len(text),
))
totalSize += len(text)
objIndex += 1
if currentSection:
sections.append(currentSection)
for ti, table in enumerate(doc.tables):
summaries.append(ContentObjectSummary(
id=f"co-{objIndex}",
contentType="text",
contextRef=ContentContextRef(
containerPath=fileName,
location=f"table:{ti+1}",
),
))
objIndex += 1
structure = {
"paragraphs": len(doc.paragraphs),
"tables": len(doc.tables),
"sections": sections,
}
return structure, summaries, len(summaries), totalSize
# ---------------------------------------------------------------------------
# PPTX scanner
# ---------------------------------------------------------------------------
async def _scanPptx(fileData: bytes, fileName: str):
try:
from pptx import Presentation
except ImportError:
return _fallbackStructure(fileData, fileName)
prs = Presentation(io.BytesIO(fileData))
summaries: List[ContentObjectSummary] = []
slideMap: List[Dict[str, Any]] = []
totalSize = 0
objIndex = 0
for i, slide in enumerate(prs.slides):
title = ""
textLen = 0
imageCount = 0
for shape in slide.shapes:
if hasattr(shape, "text"):
textLen += len(shape.text)
if shape.has_text_frame and not title:
title = shape.text.strip()[:80]
if shape.shape_type == 13:
imageCount += 1
slideMap.append({
"slideIndex": i,
"title": title,
"textLength": textLen,
"imageCount": imageCount,
})
if textLen > 0:
summaries.append(ContentObjectSummary(
id=f"co-{objIndex}",
contentType="text",
contextRef=ContentContextRef(
containerPath=fileName,
location=f"slide:{i+1}",
slideIndex=i,
),
charCount=textLen,
))
totalSize += textLen
objIndex += 1
structure = {
"slides": len(prs.slides),
"slideMap": slideMap,
}
return structure, summaries, len(summaries), totalSize
# ---------------------------------------------------------------------------
# XLSX scanner
# ---------------------------------------------------------------------------
async def _scanXlsx(fileData: bytes, fileName: str):
try:
import openpyxl
except ImportError:
return _fallbackStructure(fileData, fileName)
wb = openpyxl.load_workbook(io.BytesIO(fileData), data_only=True, read_only=True)
summaries: List[ContentObjectSummary] = []
sheetMap: List[Dict[str, Any]] = []
totalSize = 0
objIndex = 0
for sheetName in wb.sheetnames:
ws = wb[sheetName]
rowCount = ws.max_row or 0
colCount = ws.max_column or 0
sheetMap.append({
"sheetName": sheetName,
"rows": rowCount,
"columns": colCount,
})
summaries.append(ContentObjectSummary(
id=f"co-{objIndex}",
contentType="text",
contextRef=ContentContextRef(
containerPath=fileName,
location=f"sheet:{sheetName}",
sheetName=sheetName,
),
charCount=rowCount * colCount * 10,
))
totalSize += rowCount * colCount * 10
objIndex += 1
wb.close()
structure = {"sheets": len(wb.sheetnames), "sheetMap": sheetMap}
return structure, summaries, len(summaries), totalSize
# ---------------------------------------------------------------------------
# Minimal / fallback scanner
# ---------------------------------------------------------------------------
async def _scanMinimal(fileData: bytes, fileName: str):
return _fallbackStructure(fileData, fileName)
def _fallbackStructure(fileData: bytes, fileName: str):
summary = ContentObjectSummary(
id="co-0",
contentType="other",
contextRef=ContentContextRef(containerPath=fileName, location="file"),
charCount=len(fileData),
)
structure = {"type": "single", "size": len(fileData)}
return structure, [summary], 1, len(fileData)
# ---------------------------------------------------------------------------
# Scanner map
# ---------------------------------------------------------------------------
_SCANNER_MAP: Dict[str, Any] = {
"application/pdf": _scanPdf,
"application/vnd.openxmlformats-officedocument.wordprocessingml.document": _scanDocx,
"application/vnd.openxmlformats-officedocument.presentationml.presentation": _scanPptx,
"application/vnd.ms-powerpoint": _scanPptx,
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet": _scanXlsx,
}
_EXTENSION_SCANNER_MAP: Dict[str, Any] = {
"pdf": _scanPdf,
"docx": _scanDocx,
"pptx": _scanPptx,
"ppt": _scanPptx,
"xlsx": _scanXlsx,
"xlsm": _scanXlsx,
}