gateway/modules/services/serviceExtraction/formats/pdf_extractor.py

110 lines
3.9 KiB
Python

from typing import Any, Dict, List
import base64
import io
from ..utils import makeId
from modules.datamodels.datamodelExtraction import ContentPart
from ..subRegistry import Extractor
class PdfExtractor(Extractor):
def __init__(self):
self._loaded = False
self._haveLibs = False
def _load(self):
if self._loaded:
return
self._loaded = True
try:
global PyPDF2, fitz
import PyPDF2
import fitz # PyMuPDF
self._haveLibs = True
except Exception:
self._haveLibs = False
def detect(self, fileName: str, mimeType: str, headBytes: bytes) -> bool:
return mimeType == "application/pdf" or (fileName or "").lower().endswith(".pdf")
def extract(self, fileBytes: bytes, context: Dict[str, Any]) -> List[ContentPart]:
self._load()
parts: List[ContentPart] = []
rootId = makeId()
parts.append(ContentPart(
id=rootId,
parentId=None,
label="pdf",
typeGroup="container",
mimeType="application/pdf",
data="",
metadata={"size": len(fileBytes)}
))
if not self._haveLibs:
parts.append(ContentPart(
id=makeId(),
parentId=rootId,
label="binary",
typeGroup="binary",
mimeType="application/pdf",
data=base64.b64encode(fileBytes).decode("utf-8"),
metadata={"size": len(fileBytes), "warning": "PDF libs not available"}
))
return parts
# Extract text per page with PyPDF2
try:
with io.BytesIO(fileBytes) as buf:
reader = PyPDF2.PdfReader(buf)
for i, page in enumerate(reader.pages):
try:
text = page.extract_text() or ""
if text.strip():
parts.append(ContentPart(
id=makeId(),
parentId=rootId,
label=f"page_{i+1}",
typeGroup="text",
mimeType="text/plain",
data=text,
metadata={"pages": 1, "pageIndex": i, "size": len(text.encode('utf-8'))}
))
except Exception:
continue
except Exception:
pass
# Extract images with PyMuPDF
try:
with io.BytesIO(fileBytes) as buf2:
doc = fitz.open(stream=buf2, filetype="pdf")
for i in range(len(doc)):
page = doc[i]
images = page.get_images(full=True)
for j, img in enumerate(images):
try:
xref = img[0]
baseImage = doc.extract_image(xref)
if baseImage:
imgBytes = baseImage.get("image", b"")
ext = baseImage.get("ext", "png")
if imgBytes:
parts.append(ContentPart(
id=makeId(),
parentId=rootId,
label=f"image_{i+1}_{j}",
typeGroup="image",
mimeType=f"image/{ext}",
data=base64.b64encode(imgBytes).decode("utf-8"),
metadata={"pageIndex": i, "size": len(imgBytes)}
))
except Exception:
continue
doc.close()
except Exception:
pass
return parts