gateway/modules/serviceCenter/services/serviceExtraction/extractors/extractorPdf.py
2026-03-15 23:38:21 +01:00

179 lines
7.1 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
from typing import Any, Dict, List
import base64
import io
from ..subUtils import makeId
from modules.datamodels.datamodelExtraction import ContentPart
from ..subRegistry import Extractor
class PdfExtractor(Extractor):
"""
Extractor for PDF files.
Supported formats:
- MIME types: application/pdf
- File extensions: .pdf
- Special handling: Extracts text per page and embedded images
- Dependencies: PyPDF2, PyMuPDF (fitz)
"""
def __init__(self):
self._loaded = False
self._haveLibs = False
def _load(self):
if self._loaded:
return
self._loaded = True
try:
global PyPDF2, fitz
import PyPDF2
import fitz # PyMuPDF
self._haveLibs = True
except Exception:
self._haveLibs = False
def detect(self, fileName: str, mimeType: str, headBytes: bytes) -> bool:
return mimeType == "application/pdf" or (fileName or "").lower().endswith(".pdf")
def getSupportedExtensions(self) -> list[str]:
"""Return list of supported file extensions."""
return [".pdf"]
def getSupportedMimeTypes(self) -> list[str]:
"""Return list of supported MIME types."""
return ["application/pdf"]
def extract(self, fileBytes: bytes, context: Dict[str, Any]) -> List[ContentPart]:
self._load()
parts: List[ContentPart] = []
rootId = makeId()
parts.append(ContentPart(
id=rootId,
parentId=None,
label="pdf",
typeGroup="container",
mimeType="application/pdf",
data="",
metadata={"size": len(fileBytes)}
))
if not self._haveLibs:
parts.append(ContentPart(
id=makeId(),
parentId=rootId,
label="binary",
typeGroup="binary",
mimeType="application/pdf",
data=base64.b64encode(fileBytes).decode("utf-8"),
metadata={"size": len(fileBytes), "warning": "PDF libs not available"}
))
return parts
# Extract text per page with PyMuPDF (same lib as in-place search - ensures extraction matches PDF text layer)
try:
with io.BytesIO(fileBytes) as buf:
doc = fitz.open(stream=buf.getvalue(), filetype="pdf")
for i in range(len(doc)):
try:
page = doc[i]
text = page.get_text() or ""
if text.strip():
parts.append(ContentPart(
id=makeId(),
parentId=rootId,
label=f"page_{i+1}",
typeGroup="text",
mimeType="text/plain",
data=text,
metadata={
"pages": 1, "pageIndex": i,
"size": len(text.encode('utf-8')),
"contextRef": {
"containerPath": context.get("fileName", "document.pdf"),
"location": f"page:{i+1}",
"pageIndex": i,
},
}
))
except Exception:
continue
doc.close()
except Exception:
pass
# Fallback to PyPDF2 if PyMuPDF text extraction failed or returned nothing
has_text = any(getattr(p, 'typeGroup', '') == "text" for p in parts)
if not has_text:
try:
with io.BytesIO(fileBytes) as buf:
reader = PyPDF2.PdfReader(buf)
for i, page in enumerate(reader.pages):
try:
text = page.extract_text() or ""
if text.strip():
parts.append(ContentPart(
id=makeId(),
parentId=rootId,
label=f"page_{i+1}",
typeGroup="text",
mimeType="text/plain",
data=text,
metadata={
"pages": 1, "pageIndex": i,
"size": len(text.encode('utf-8')),
"contextRef": {
"containerPath": context.get("fileName", "document.pdf"),
"location": f"page:{i+1}",
"pageIndex": i,
},
}
))
except Exception:
continue
except Exception:
pass
# Extract images with PyMuPDF
try:
with io.BytesIO(fileBytes) as buf2:
doc = fitz.open(stream=buf2.getvalue(), filetype="pdf")
for i in range(len(doc)):
page = doc[i]
images = page.get_images(full=True)
for j, img in enumerate(images):
try:
xref = img[0]
baseImage = doc.extract_image(xref)
if baseImage:
imgBytes = baseImage.get("image", b"")
ext = baseImage.get("ext", "png")
if imgBytes:
parts.append(ContentPart(
id=makeId(),
parentId=rootId,
label=f"image_{i+1}_{j}",
typeGroup="image",
mimeType=f"image/{ext}",
data=base64.b64encode(imgBytes).decode("utf-8"),
metadata={
"pageIndex": i, "size": len(imgBytes),
"contextRef": {
"containerPath": context.get("fileName", "document.pdf"),
"location": f"page:{i+1}/image:{j}",
"pageIndex": i,
},
}
))
except Exception:
continue
doc.close()
except Exception:
pass
return parts