57 lines
2.1 KiB
Python
57 lines
2.1 KiB
Python
from typing import Any, Dict, List
|
|
import base64
|
|
import logging
|
|
|
|
from ..subUtils import makeId
|
|
from modules.datamodels.datamodelExtraction import ContentPart
|
|
from ..subRegistry import Extractor
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class ImageExtractor(Extractor):
|
|
def detect(self, fileName: str, mimeType: str, headBytes: bytes) -> bool:
|
|
return (mimeType or "").startswith("image/")
|
|
|
|
def extract(self, fileBytes: bytes, context: Dict[str, Any]) -> List[ContentPart]:
|
|
mimeType = context.get("mimeType") or "image/unknown"
|
|
fileName = context.get("fileName", "")
|
|
|
|
# Convert GIF to PNG during extraction
|
|
if mimeType.lower() == "image/gif":
|
|
try:
|
|
from PIL import Image
|
|
import io
|
|
|
|
# Open GIF and convert to PNG
|
|
with Image.open(io.BytesIO(fileBytes)) as img:
|
|
# Convert to RGB (removes animation)
|
|
if img.mode in ('RGBA', 'LA', 'P'):
|
|
img = img.convert('RGB')
|
|
|
|
# Save as PNG in memory
|
|
png_buffer = io.BytesIO()
|
|
img.save(png_buffer, format='PNG')
|
|
png_data = png_buffer.getvalue()
|
|
|
|
# Update mimeType and fileBytes
|
|
mimeType = "image/png"
|
|
fileBytes = png_data
|
|
|
|
logger.info(f"GIF converted to PNG during extraction: {fileName}, original={len(fileBytes)} bytes, converted={len(png_data)} bytes")
|
|
|
|
except Exception as e:
|
|
logger.warning(f"GIF conversion failed during extraction for {fileName}: {str(e)}, using original")
|
|
# Keep original GIF data if conversion fails
|
|
|
|
return [ContentPart(
|
|
id=makeId(),
|
|
parentId=None,
|
|
label="image",
|
|
typeGroup="image",
|
|
mimeType=mimeType,
|
|
data=base64.b64encode(fileBytes).decode("utf-8"),
|
|
metadata={"size": len(fileBytes)}
|
|
)]
|
|
|
|
|