from typing import Any, Dict, List import base64 import io from modules.datamodels.datamodelExtraction import ContentPart from ..subRegistry import Chunker class ImageChunker(Chunker): """Chunker for reducing image size through resizing, compression, and tiling.""" def chunk(self, part: ContentPart, options: Dict[str, Any]) -> list[Dict[str, Any]]: """ Chunk an image by reducing its size through various strategies. Args: part: ContentPart containing image data (base64 encoded) options: Chunking options including: - imageChunkSize: Maximum size in bytes for each chunk - imageMaxPixels: Maximum pixels (width*height) for the image - imageQuality: JPEG quality (0-100, default 85) - imageTileSize: Size for tiling if image is still too large Returns: List of image chunks with reduced size """ maxBytes = int(options.get("imageChunkSize", 1000000)) # 1MB default maxPixels = int(options.get("imageMaxPixels", 1024 * 1024)) # 1MP default quality = int(options.get("imageQuality", 85)) tileSize = int(options.get("imageTileSize", 512)) # 512x512 tiles chunks: List[Dict[str, Any]] = [] try: # Lazy import PIL to avoid hanging during module import from PIL import Image # Decode base64 image data imageData = base64.b64decode(part.data) image = Image.open(io.BytesIO(imageData)) # Get original dimensions originalWidth, originalHeight = image.size originalPixels = originalWidth * originalHeight # Strategy 1: If image is small enough, return as-is if len(part.data) <= maxBytes and originalPixels <= maxPixels: chunks.append({ "data": part.data, "size": len(part.data), "order": 0, "metadata": { "originalSize": len(part.data), "originalPixels": originalPixels, "strategy": "original" } }) return chunks # Strategy 2: Resize to fit within pixel limit if originalPixels > maxPixels: # Calculate new dimensions maintaining aspect ratio scale = (maxPixels / originalPixels) ** 0.5 newWidth = int(originalWidth * scale) newHeight = int(originalHeight * scale) # Ensure minimum size newWidth = max(newWidth, 64) newHeight = max(newHeight, 64) image = image.resize((newWidth, newHeight), Image.Resampling.LANCZOS) # Strategy 3: Compress with quality reduction currentSize = len(part.data) currentQuality = quality while currentSize > maxBytes and currentQuality > 10: # Compress image output = io.BytesIO() image.save(output, format='JPEG', quality=currentQuality, optimize=True) compressedData = output.getvalue() compressedB64 = base64.b64encode(compressedData).decode('utf-8') currentSize = len(compressedB64) if currentSize <= maxBytes: chunks.append({ "data": compressedB64, "size": currentSize, "order": 0, "metadata": { "originalSize": len(part.data), "originalPixels": originalPixels, "compressedSize": currentSize, "quality": currentQuality, "strategy": "compressed" } }) return chunks currentQuality -= 10 # Strategy 4: Tile the image if still too large if currentSize > maxBytes: chunks = self._tileImage(image, maxBytes, tileSize, quality, originalPixels) return chunks # Fallback: Return compressed version even if over limit output = io.BytesIO() image.save(output, format='JPEG', quality=10, optimize=True) compressedData = output.getvalue() compressedB64 = base64.b64encode(compressedData).decode('utf-8') chunks.append({ "data": compressedB64, "size": len(compressedB64), "order": 0, "metadata": { "originalSize": len(part.data), "originalPixels": originalPixels, "compressedSize": len(compressedB64), "quality": 10, "strategy": "fallback_compressed" } }) except Exception as e: # Fallback: Return original data with error metadata chunks.append({ "data": part.data, "size": len(part.data), "order": 0, "metadata": { "originalSize": len(part.data), "strategy": "error_fallback", "error": str(e) } }) return chunks def _tileImage(self, image: Any, maxBytes: int, tileSize: int, quality: int, originalPixels: int) -> List[Dict[str, Any]]: """Split image into tiles if it's still too large after compression.""" chunks = [] width, height = image.size # Calculate tile grid tilesX = (width + tileSize - 1) // tileSize tilesY = (height + tileSize - 1) // tileSize for y in range(tilesY): for x in range(tilesX): # Calculate tile boundaries left = x * tileSize top = y * tileSize right = min(left + tileSize, width) bottom = min(top + tileSize, height) # Extract tile tile = image.crop((left, top, right, bottom)) # Compress tile output = io.BytesIO() tile.save(output, format='JPEG', quality=quality, optimize=True) tileData = output.getvalue() tileB64 = base64.b64encode(tileData).decode('utf-8') chunks.append({ "data": tileB64, "size": len(tileB64), "order": y * tilesX + x, "metadata": { "originalSize": len(image.tobytes()), "originalPixels": originalPixels, "tileSize": tileSize, "tilePosition": f"{x},{y}", "tileBounds": f"{left},{top},{right},{bottom}", "quality": quality, "strategy": "tiled" } }) return chunks