77 lines
2.9 KiB
Python
77 lines
2.9 KiB
Python
# Copyright (c) 2025 Patrick Motsch
|
|
# All rights reserved.
|
|
from typing import Any, Dict, List
|
|
import base64
|
|
import logging
|
|
|
|
from ..subUtils import makeId
|
|
from modules.datamodels.datamodelExtraction import ContentPart
|
|
from ..subRegistry import Extractor
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class ImageExtractor(Extractor):
|
|
"""
|
|
Extractor for image files.
|
|
|
|
Supported formats:
|
|
- MIME types: image/jpeg, image/png, image/gif, image/webp, image/bmp, image/tiff
|
|
- File extensions: .jpg, .jpeg, .png, .gif, .webp, .bmp, .tiff
|
|
- Special handling: GIF files are converted to PNG during extraction
|
|
"""
|
|
|
|
def detect(self, fileName: str, mimeType: str, headBytes: bytes) -> bool:
|
|
return ((mimeType or "").startswith("image/") or
|
|
(fileName or "").lower().endswith((".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp", ".tiff")))
|
|
|
|
def getSupportedExtensions(self) -> list[str]:
|
|
"""Return list of supported file extensions."""
|
|
return [".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp", ".tiff"]
|
|
|
|
def getSupportedMimeTypes(self) -> list[str]:
|
|
"""Return list of supported MIME types."""
|
|
return ["image/jpeg", "image/png", "image/gif", "image/webp", "image/bmp", "image/tiff"]
|
|
|
|
def extract(self, fileBytes: bytes, context: Dict[str, Any]) -> List[ContentPart]:
|
|
mimeType = context.get("mimeType") or "image/unknown"
|
|
fileName = context.get("fileName", "")
|
|
|
|
# Convert GIF to PNG during extraction
|
|
if mimeType.lower() == "image/gif":
|
|
try:
|
|
from PIL import Image
|
|
import io
|
|
|
|
# Open GIF and convert to PNG
|
|
with Image.open(io.BytesIO(fileBytes)) as img:
|
|
# Convert to RGB (removes animation)
|
|
if img.mode in ('RGBA', 'LA', 'P'):
|
|
img = img.convert('RGB')
|
|
|
|
# Save as PNG in memory
|
|
png_buffer = io.BytesIO()
|
|
img.save(png_buffer, format='PNG')
|
|
png_data = png_buffer.getvalue()
|
|
|
|
# Update mimeType and fileBytes
|
|
mimeType = "image/png"
|
|
fileBytes = png_data
|
|
|
|
logger.info(f"GIF converted to PNG during extraction: {fileName}, original={len(fileBytes)} bytes, converted={len(png_data)} bytes")
|
|
|
|
except Exception as e:
|
|
logger.warning(f"GIF conversion failed during extraction for {fileName}: {str(e)}, using original")
|
|
# Keep original GIF data if conversion fails
|
|
|
|
return [ContentPart(
|
|
id=makeId(),
|
|
parentId=None,
|
|
label="image",
|
|
typeGroup="image",
|
|
mimeType=mimeType,
|
|
data=base64.b64encode(fileBytes).decode("utf-8"),
|
|
metadata={"size": len(fileBytes)}
|
|
)]
|
|
|
|
|