gateway/modules/services/serviceExtraction/chunking/chunkerImage.py
2025-12-15 21:55:26 +01:00

184 lines
7.3 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
from typing import Any, Dict, List
import base64
import io
from modules.datamodels.datamodelExtraction import ContentPart
from ..subRegistry import Chunker
class ImageChunker(Chunker):
"""Chunker for reducing image size through resizing, compression, and tiling."""
def chunk(self, part: ContentPart, options: Dict[str, Any]) -> list[Dict[str, Any]]:
"""
Chunk an image by reducing its size through various strategies.
Args:
part: ContentPart containing image data (base64 encoded)
options: Chunking options including:
- imageChunkSize: Maximum size in bytes for each chunk
- imageMaxPixels: Maximum pixels (width*height) for the image
- imageQuality: JPEG quality (0-100, default 85)
- imageTileSize: Size for tiling if image is still too large
Returns:
List of image chunks with reduced size
"""
maxBytes = int(options.get("imageChunkSize", 1000000)) # 1MB default
maxPixels = int(options.get("imageMaxPixels", 1024 * 1024)) # 1MP default
quality = int(options.get("imageQuality", 85))
tileSize = int(options.get("imageTileSize", 512)) # 512x512 tiles
chunks: List[Dict[str, Any]] = []
try:
# Lazy import PIL to avoid hanging during module import
from PIL import Image
# Decode base64 image data
imageData = base64.b64decode(part.data)
image = Image.open(io.BytesIO(imageData))
# Get original dimensions
originalWidth, originalHeight = image.size
originalPixels = originalWidth * originalHeight
# Strategy 1: If image is small enough, return as-is
if len(part.data) <= maxBytes and originalPixels <= maxPixels:
chunks.append({
"data": part.data,
"size": len(part.data),
"order": 0,
"metadata": {
"originalSize": len(part.data),
"originalPixels": originalPixels,
"strategy": "original"
}
})
return chunks
# Strategy 2: Resize to fit within pixel limit
if originalPixels > maxPixels:
# Calculate new dimensions maintaining aspect ratio
scale = (maxPixels / originalPixels) ** 0.5
newWidth = int(originalWidth * scale)
newHeight = int(originalHeight * scale)
# Ensure minimum size
newWidth = max(newWidth, 64)
newHeight = max(newHeight, 64)
image = image.resize((newWidth, newHeight), Image.Resampling.LANCZOS)
# Strategy 3: Compress with quality reduction
currentSize = len(part.data)
currentQuality = quality
while currentSize > maxBytes and currentQuality > 10:
# Compress image
output = io.BytesIO()
image.save(output, format='JPEG', quality=currentQuality, optimize=True)
compressedData = output.getvalue()
compressedB64 = base64.b64encode(compressedData).decode('utf-8')
currentSize = len(compressedB64)
if currentSize <= maxBytes:
chunks.append({
"data": compressedB64,
"size": currentSize,
"order": 0,
"metadata": {
"originalSize": len(part.data),
"originalPixels": originalPixels,
"compressedSize": currentSize,
"quality": currentQuality,
"strategy": "compressed"
}
})
return chunks
currentQuality -= 10
# Strategy 4: Tile the image if still too large
if currentSize > maxBytes:
chunks = self._tileImage(image, maxBytes, tileSize, quality, originalPixels)
return chunks
# Fallback: Return compressed version even if over limit
output = io.BytesIO()
image.save(output, format='JPEG', quality=10, optimize=True)
compressedData = output.getvalue()
compressedB64 = base64.b64encode(compressedData).decode('utf-8')
chunks.append({
"data": compressedB64,
"size": len(compressedB64),
"order": 0,
"metadata": {
"originalSize": len(part.data),
"originalPixels": originalPixels,
"compressedSize": len(compressedB64),
"quality": 10,
"strategy": "fallback_compressed"
}
})
except Exception as e:
# Fallback: Return original data with error metadata
chunks.append({
"data": part.data,
"size": len(part.data),
"order": 0,
"metadata": {
"originalSize": len(part.data),
"strategy": "error_fallback",
"error": str(e)
}
})
return chunks
def _tileImage(self, image: Any, maxBytes: int, tileSize: int, quality: int, originalPixels: int) -> List[Dict[str, Any]]:
"""Split image into tiles if it's still too large after compression."""
chunks = []
width, height = image.size
# Calculate tile grid
tilesX = (width + tileSize - 1) // tileSize
tilesY = (height + tileSize - 1) // tileSize
for y in range(tilesY):
for x in range(tilesX):
# Calculate tile boundaries
left = x * tileSize
top = y * tileSize
right = min(left + tileSize, width)
bottom = min(top + tileSize, height)
# Extract tile
tile = image.crop((left, top, right, bottom))
# Compress tile
output = io.BytesIO()
tile.save(output, format='JPEG', quality=quality, optimize=True)
tileData = output.getvalue()
tileB64 = base64.b64encode(tileData).decode('utf-8')
chunks.append({
"data": tileB64,
"size": len(tileB64),
"order": y * tilesX + x,
"metadata": {
"originalSize": len(image.tobytes()),
"originalPixels": originalPixels,
"tileSize": tileSize,
"tilePosition": f"{x},{y}",
"tileBounds": f"{left},{top},{right},{bottom}",
"quality": quality,
"strategy": "tiled"
}
})
return chunks