gateway/modules/services/serviceExtraction/extractors/extractorImage.py
2025-12-15 21:55:26 +01:00

77 lines
2.9 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
from typing import Any, Dict, List
import base64
import logging
from ..subUtils import makeId
from modules.datamodels.datamodelExtraction import ContentPart
from ..subRegistry import Extractor
logger = logging.getLogger(__name__)
class ImageExtractor(Extractor):
"""
Extractor for image files.
Supported formats:
- MIME types: image/jpeg, image/png, image/gif, image/webp, image/bmp, image/tiff
- File extensions: .jpg, .jpeg, .png, .gif, .webp, .bmp, .tiff
- Special handling: GIF files are converted to PNG during extraction
"""
def detect(self, fileName: str, mimeType: str, headBytes: bytes) -> bool:
return ((mimeType or "").startswith("image/") or
(fileName or "").lower().endswith((".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp", ".tiff")))
def getSupportedExtensions(self) -> list[str]:
"""Return list of supported file extensions."""
return [".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp", ".tiff"]
def getSupportedMimeTypes(self) -> list[str]:
"""Return list of supported MIME types."""
return ["image/jpeg", "image/png", "image/gif", "image/webp", "image/bmp", "image/tiff"]
def extract(self, fileBytes: bytes, context: Dict[str, Any]) -> List[ContentPart]:
mimeType = context.get("mimeType") or "image/unknown"
fileName = context.get("fileName", "")
# Convert GIF to PNG during extraction
if mimeType.lower() == "image/gif":
try:
from PIL import Image
import io
# Open GIF and convert to PNG
with Image.open(io.BytesIO(fileBytes)) as img:
# Convert to RGB (removes animation)
if img.mode in ('RGBA', 'LA', 'P'):
img = img.convert('RGB')
# Save as PNG in memory
png_buffer = io.BytesIO()
img.save(png_buffer, format='PNG')
png_data = png_buffer.getvalue()
# Update mimeType and fileBytes
mimeType = "image/png"
fileBytes = png_data
logger.info(f"GIF converted to PNG during extraction: {fileName}, original={len(fileBytes)} bytes, converted={len(png_data)} bytes")
except Exception as e:
logger.warning(f"GIF conversion failed during extraction for {fileName}: {str(e)}, using original")
# Keep original GIF data if conversion fails
return [ContentPart(
id=makeId(),
parentId=None,
label="image",
typeGroup="image",
mimeType=mimeType,
data=base64.b64encode(fileBytes).decode("utf-8"),
metadata={"size": len(fileBytes)}
)]