# Copyright (c) 2025 Patrick Motsch # All rights reserved. from typing import Any, Dict, List import logging from modules.datamodels.datamodelExtraction import ContentPart from ..subRegistry import Chunker logger = logging.getLogger(__name__) class TextChunker(Chunker): def chunk(self, part: ContentPart, options: Dict[str, Any]) -> list[Dict[str, Any]]: maxBytes = int(options.get("textChunkSize", 40000)) logger.debug(f"TextChunker: textChunkSize from options: {options.get('textChunkSize', 'NOT_FOUND')}") logger.debug(f"TextChunker: using maxBytes: {maxBytes}") chunks: List[Dict[str, Any]] = [] # Split by lines first (preferred method for text) lines = part.data.split('\n') current: List[str] = [] size = 0 for line in lines: lineSize = len(line.encode('utf-8')) + 1 # +1 for newline character if size + lineSize > maxBytes and current: # Current chunk is full, save it and start new one data = '\n'.join(current) chunks.append({"data": data, "size": len(data.encode('utf-8')), "order": len(chunks)}) current = [] size = 0 # If a single line is larger than maxBytes, split it by character boundaries if lineSize > maxBytes: # Split the long line into chunks lineBytes = line.encode('utf-8') lineStart = 0 while lineStart < len(lineBytes): chunkBytes = lineBytes[lineStart:lineStart + maxBytes] chunkText = chunkBytes.decode('utf-8', errors='ignore') chunks.append({"data": chunkText, "size": len(chunkBytes), "order": len(chunks)}) lineStart += maxBytes # Don't add this line to current, it's already chunked continue # Add line to current chunk current.append(line) size += lineSize # Add remaining lines as final chunk if current: data = '\n'.join(current) chunks.append({"data": data, "size": len(data.encode('utf-8')), "order": len(chunks)}) logger.debug(f"TextChunker: Created {len(chunks)} chunks, total input size: {len(part.data.encode('utf-8'))} bytes") return chunks