182 lines
7.3 KiB
Python
182 lines
7.3 KiB
Python
from typing import Any, Dict, List
|
|
import base64
|
|
import io
|
|
|
|
from modules.datamodels.datamodelExtraction import ContentPart
|
|
from ..subRegistry import Chunker
|
|
|
|
|
|
class ImageChunker(Chunker):
|
|
"""Chunker for reducing image size through resizing, compression, and tiling."""
|
|
|
|
def chunk(self, part: ContentPart, options: Dict[str, Any]) -> list[Dict[str, Any]]:
|
|
"""
|
|
Chunk an image by reducing its size through various strategies.
|
|
|
|
Args:
|
|
part: ContentPart containing image data (base64 encoded)
|
|
options: Chunking options including:
|
|
- imageChunkSize: Maximum size in bytes for each chunk
|
|
- imageMaxPixels: Maximum pixels (width*height) for the image
|
|
- imageQuality: JPEG quality (0-100, default 85)
|
|
- imageTileSize: Size for tiling if image is still too large
|
|
|
|
Returns:
|
|
List of image chunks with reduced size
|
|
"""
|
|
maxBytes = int(options.get("imageChunkSize", 1000000)) # 1MB default
|
|
maxPixels = int(options.get("imageMaxPixels", 1024 * 1024)) # 1MP default
|
|
quality = int(options.get("imageQuality", 85))
|
|
tileSize = int(options.get("imageTileSize", 512)) # 512x512 tiles
|
|
|
|
chunks: List[Dict[str, Any]] = []
|
|
|
|
try:
|
|
# Lazy import PIL to avoid hanging during module import
|
|
from PIL import Image
|
|
|
|
# Decode base64 image data
|
|
imageData = base64.b64decode(part.data)
|
|
image = Image.open(io.BytesIO(imageData))
|
|
|
|
# Get original dimensions
|
|
originalWidth, originalHeight = image.size
|
|
originalPixels = originalWidth * originalHeight
|
|
|
|
# Strategy 1: If image is small enough, return as-is
|
|
if len(part.data) <= maxBytes and originalPixels <= maxPixels:
|
|
chunks.append({
|
|
"data": part.data,
|
|
"size": len(part.data),
|
|
"order": 0,
|
|
"metadata": {
|
|
"originalSize": len(part.data),
|
|
"originalPixels": originalPixels,
|
|
"strategy": "original"
|
|
}
|
|
})
|
|
return chunks
|
|
|
|
# Strategy 2: Resize to fit within pixel limit
|
|
if originalPixels > maxPixels:
|
|
# Calculate new dimensions maintaining aspect ratio
|
|
scale = (maxPixels / originalPixels) ** 0.5
|
|
newWidth = int(originalWidth * scale)
|
|
newHeight = int(originalHeight * scale)
|
|
|
|
# Ensure minimum size
|
|
newWidth = max(newWidth, 64)
|
|
newHeight = max(newHeight, 64)
|
|
|
|
image = image.resize((newWidth, newHeight), Image.Resampling.LANCZOS)
|
|
|
|
# Strategy 3: Compress with quality reduction
|
|
currentSize = len(part.data)
|
|
currentQuality = quality
|
|
|
|
while currentSize > maxBytes and currentQuality > 10:
|
|
# Compress image
|
|
output = io.BytesIO()
|
|
image.save(output, format='JPEG', quality=currentQuality, optimize=True)
|
|
compressedData = output.getvalue()
|
|
compressedB64 = base64.b64encode(compressedData).decode('utf-8')
|
|
currentSize = len(compressedB64)
|
|
|
|
if currentSize <= maxBytes:
|
|
chunks.append({
|
|
"data": compressedB64,
|
|
"size": currentSize,
|
|
"order": 0,
|
|
"metadata": {
|
|
"originalSize": len(part.data),
|
|
"originalPixels": originalPixels,
|
|
"compressedSize": currentSize,
|
|
"quality": currentQuality,
|
|
"strategy": "compressed"
|
|
}
|
|
})
|
|
return chunks
|
|
|
|
currentQuality -= 10
|
|
|
|
# Strategy 4: Tile the image if still too large
|
|
if currentSize > maxBytes:
|
|
chunks = self._tileImage(image, maxBytes, tileSize, quality, originalPixels)
|
|
return chunks
|
|
|
|
# Fallback: Return compressed version even if over limit
|
|
output = io.BytesIO()
|
|
image.save(output, format='JPEG', quality=10, optimize=True)
|
|
compressedData = output.getvalue()
|
|
compressedB64 = base64.b64encode(compressedData).decode('utf-8')
|
|
|
|
chunks.append({
|
|
"data": compressedB64,
|
|
"size": len(compressedB64),
|
|
"order": 0,
|
|
"metadata": {
|
|
"originalSize": len(part.data),
|
|
"originalPixels": originalPixels,
|
|
"compressedSize": len(compressedB64),
|
|
"quality": 10,
|
|
"strategy": "fallback_compressed"
|
|
}
|
|
})
|
|
|
|
except Exception as e:
|
|
# Fallback: Return original data with error metadata
|
|
chunks.append({
|
|
"data": part.data,
|
|
"size": len(part.data),
|
|
"order": 0,
|
|
"metadata": {
|
|
"originalSize": len(part.data),
|
|
"strategy": "error_fallback",
|
|
"error": str(e)
|
|
}
|
|
})
|
|
|
|
return chunks
|
|
|
|
def _tileImage(self, image: Any, maxBytes: int, tileSize: int, quality: int, originalPixels: int) -> List[Dict[str, Any]]:
|
|
"""Split image into tiles if it's still too large after compression."""
|
|
chunks = []
|
|
width, height = image.size
|
|
|
|
# Calculate tile grid
|
|
tilesX = (width + tileSize - 1) // tileSize
|
|
tilesY = (height + tileSize - 1) // tileSize
|
|
|
|
for y in range(tilesY):
|
|
for x in range(tilesX):
|
|
# Calculate tile boundaries
|
|
left = x * tileSize
|
|
top = y * tileSize
|
|
right = min(left + tileSize, width)
|
|
bottom = min(top + tileSize, height)
|
|
|
|
# Extract tile
|
|
tile = image.crop((left, top, right, bottom))
|
|
|
|
# Compress tile
|
|
output = io.BytesIO()
|
|
tile.save(output, format='JPEG', quality=quality, optimize=True)
|
|
tileData = output.getvalue()
|
|
tileB64 = base64.b64encode(tileData).decode('utf-8')
|
|
|
|
chunks.append({
|
|
"data": tileB64,
|
|
"size": len(tileB64),
|
|
"order": y * tilesX + x,
|
|
"metadata": {
|
|
"originalSize": len(image.tobytes()),
|
|
"originalPixels": originalPixels,
|
|
"tileSize": tileSize,
|
|
"tilePosition": f"{x},{y}",
|
|
"tileBounds": f"{left},{top},{right},{bottom}",
|
|
"quality": quality,
|
|
"strategy": "tiled"
|
|
}
|
|
})
|
|
|
|
return chunks
|